Compare commits

..

28 Commits

Author SHA1 Message Date
6145a23296 added border to input box 2026-03-12 08:59:53 -05:00
16d79df421 fix: empty response handling, /no_think model gating, per-model profiles
- Detect empty LLM responses (no content, no tool calls) instead of
  silently treating them as task completion. Retries once without tools
  before warning the user.
- Gate /no_think system message and chat_template_kwargs to Qwen/QwQ
  models only — sending /no_think to llama3.x caused empty responses.
- Add model_profiles config section for per-model overrides (token
  budget, thinking, temperature, max_tokens) matched by name prefix.
  Applied at startup and on /model switch.
- Update SessionManager on /model switch so session files record the
  correct model.
- Add NDJSON fallback in SSE stream parser for Ollama compatibility.
- Improve read_file error to suggest find_files on FileNotFoundError.
- Add diagnostic logging for empty streams and empty results.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 23:09:04 -05:00
1ee721ac10 Merge branch 'bugfix/model-command-regex' 2026-03-11 22:31:50 -05:00
d54a3480b8 fix: /model command no longer incorrectly matched as /mode
Use exact first-word matching instead of startswith() for /models and
/mode command routing. Accept /model as an alias for /models.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 22:31:45 -05:00
d3b286ba40 Merge branch 'feature/file-cache-lru' 2026-03-11 22:26:56 -05:00
d829e6553c feat: add tool-level file cache with LRU eviction and mtime invalidation
Introduces FileCache (OrderedDict LRU, st_mtime_ns validation) to avoid
redundant disk reads and duplicate content in conversation context.
Read tools return a short "[Cached]" message on cache hit instead of
resending unchanged file content, saving tokens. Write/edit/delete tools
invalidate affected paths; str_replace pre-warms the cache after edits.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 22:26:51 -05:00
2c532adbbc Merge branch 'feature/read-many-files-and-fixes' 2026-03-11 21:57:39 -05:00
be1ea81102 fix: preserve streaming UI callbacks across agent loop iterations
StreamHandler.reset() was clearing on_content, on_thinking, and on_done
callbacks after every LLM response, but they were only set once per turn.
This caused the thinking indicator and streaming display to stop working
after the first tool call in a multi-step agent turn.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:56:59 -05:00
3fe0f7af47 style: change header background from orange to dark cyan
Improves readability of the lightning bolt emoji and gives the header
a distinctive look.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:56:57 -05:00
05754fe06b feat: add read_many_files tool for batch file reading
Reduces LLM round-trips by allowing multiple files to be read in a
single tool call. Uses best-effort error handling so partial failures
still return successful reads.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:56:53 -05:00
0886727437 Merge branch 'feature/header-panel-agent-modes' 2026-03-11 21:36:27 -05:00
638aecb561 feat: add custom HeaderPanel widget and switchable agent modes
Replace built-in Header with a custom HeaderPanel showing model name,
mode badge, and live token usage. Add AgentMode enum (normal/plan/auto)
with mode-aware permission gating — Plan mode restricts to read-only
tools, Auto mode auto-approves everything. Includes /mode slash command
and Ctrl+P keybinding to cycle modes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:36:23 -05:00
b878408f3e Merge branch 'fix/remove-cat-allowed-command' 2026-03-11 21:15:05 -05:00
5b5c3098bb fix: remove cat from allowed shell commands and fix stale display tests
Remove cat from allowed_commands to close shell redirect bypass vector
(read_file provides safer alternative). Update display tests to match
render_user_message returning Text instead of Panel, and shell test to
use head instead of cat.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:14:21 -05:00
4e3da84578 Merge branch 'fix/display-bug-and-shell-write-bypass' 2026-03-11 20:58:55 -05:00
2ad3df521d fix: display assistant content with finish tool call + block shell write redirects
Bug 1: Assistant text content was silently dropped when the LLM response
included both content and tool calls (e.g. finish with a summary). Now
content is displayed before tool call execution regardless.

Bug 2: Shell redirect operators (>, >>, <<) allowed bypassing file-write
permissions when the base command (e.g. cat) was in the allowed list.
Redirects now require explicit user approval in permissions, and the
shell tool itself blocks them as defense-in-depth.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:58:07 -05:00
4496fce354 docs: update README with full config reference, remove completed roadmap
Expands README with complete config.yaml reference, CLI options table,
skills documentation, and updated command list. Removes the old roadmap
(all phases complete). Updates tweaks.md with current design notes.
Adds .sneakycode/ to .gitignore and includes superpowers design specs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:05:16 -05:00
133bcbda57 feat: re-echo condensed user prompt in chat log
Shows user input as a compact one-liner ("You: prompt text") instead of
a full panel. Multi-line input is collapsed to the first line with a
"(+N lines)" suffix, and long lines are truncated at 120 chars.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:05:10 -05:00
7705008b9c fix: prevent /no_think tag from leaking into user message content
The /no_think directive was appended directly to the user's message text,
causing the LLM to interpret it as user input (e.g., referencing it as a
directory path). Now injected as a separate system message after the last
user message. Also adds which, jq, type, and file to shell allowed_commands
(were mistakenly placed in permissions.auto_approve).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:04:47 -05:00
9273d14845 Merge branch 'feature/disable-thinking-mode' 2026-03-11 19:36:49 -05:00
f0d8ef8f0a feat: add thinking mode toggle to suppress reasoning-only response loops
Adds `llm.thinking` config option (default: true) that when disabled:
- Injects /no_think into the last user message for Qwen 3.x compatibility
- Sends chat_template_kwargs in API payload for backends that support it
- Silently and immediately nudges on reasoning-only responses instead of
  showing warnings and wasting retry iterations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 19:34:36 -05:00
25fa7dc82b Merge branch 'feature/llm-extra-body-config' 2026-03-11 19:15:39 -05:00
220c6613e4 feat: add extra_body config for model-specific API parameters
Allows passing arbitrary parameters (e.g., enable_thinking, reasoning_effort)
to the LLM API request body via config.yaml, solving reasoning-only response
loops with models like Qwen 3.x without requiring code changes per model.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 19:15:35 -05:00
22f10cd8e9 Merge branch 'feature/structured-skill-packages' 2026-03-11 19:06:13 -05:00
2ae8294e29 feat: structured skill packages with config overrides, chaining, and TUI integration
Add a skill package system where each skill is a directory with a skill.yaml
manifest and prompt markdown files. Skills support /command triggers, scoped
config overrides (temperature, max_tokens, tool filtering), chain dependencies
with cycle-safe resolution, and a finish_skill completion signal.

Includes four built-in skills: explore, brainstorm, write-document, and plan.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 19:06:05 -05:00
26bcbc6c1f Merge branch 'fix/markdown-rendering-and-reasoning-retry' 2026-03-11 18:21:58 -05:00
cc03f76593 fix: render markdown in assistant panels and speed up reasoning-only recovery
Assistant message panels now use Markdown() instead of raw strings, so
bold/italic/lists render properly. Also nudge the model immediately after
tool errors instead of wasting 2 retry iterations in reasoning-only mode.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 18:21:51 -05:00
90a38f12d1 fix: wrap session resume in @work to resolve NoActiveWorker crash
push_screen_wait requires a worker context, but on_mount is a plain
lifecycle callback. Extract session resume logic into a @work-decorated
method so the modal can be awaited without triggering NoActiveWorker.
2026-03-11 15:52:28 -05:00
40 changed files with 4179 additions and 266 deletions

3
.gitignore vendored
View File

@@ -34,3 +34,6 @@ htmlcov/
# Worktrees
.worktrees/
# SneakyCode local data
.sneakycode/

View File

@@ -0,0 +1,40 @@
# Brainstorm Skill
You are in **brainstorming mode**. Your goal is creative ideation — generating multiple approaches, exploring trade-offs, and helping the user think through possibilities before committing to an implementation.
## Process
1. **Clarify the goal**: Make sure you understand what the user wants to achieve. Ask clarifying questions if needed.
2. **Divergent thinking**: Generate at least 3 distinct approaches. Push beyond the obvious — include creative or unconventional options.
3. **Evaluate trade-offs**: For each approach, identify:
- Pros and cons
- Complexity and effort estimate (low / medium / high)
- Risk factors
- What it enables or prevents in the future
4. **Synthesize**: Recommend your top pick with reasoning, but present all options fairly.
5. **Refine**: Ask the user which direction appeals to them and iterate.
## Guidelines
- Read relevant code first to ground your suggestions in reality (the explore skill has already run if chained).
- Don't just list options — explain *why* each one is interesting or viable.
- Be bold. Brainstorming is the place for ambitious ideas.
- If the user's initial framing seems limiting, gently challenge it.
- Avoid implementation details at this stage — focus on approach and design.
## Output Format
Present options as numbered approaches with clear headings:
### Approach 1: [Name]
[Description, pros, cons, complexity]
### Approach 2: [Name]
[Description, pros, cons, complexity]
### Approach 3: [Name]
[Description, pros, cons, complexity]
**Recommendation**: [Your pick and why]
When brainstorming is complete and the user has chosen a direction, call `finish_skill` summarizing the chosen approach.

View File

@@ -0,0 +1,9 @@
name: brainstorm
description: Creative ideation — divergent thinking, option generation, structured exploration
version: "1.0"
triggers: ["/brainstorm", "/bs"]
config_overrides:
temperature: 1.2
tools_disable: [write_file, make_dir, delete_file, str_replace, patch_apply, run_command]
chain: [explore]
prompts: [prompt.md]

View File

@@ -0,0 +1,31 @@
# Explore Skill
You are in **exploration mode**. Your goal is to deeply understand the codebase or a specific area of it. Do NOT make any changes — only read, search, and analyze.
## Approach
1. **Start broad**: Use `list_dir` and `find_files` to understand the project structure
2. **Trace paths**: Follow imports, function calls, and data flow through the code
3. **Map relationships**: Identify which files depend on which, and how components interact
4. **Read carefully**: Use `read_file` to examine key files in detail
5. **Search patterns**: Use `grep_files` to find usage patterns, implementations, and references
## Output Format
Produce a structured summary with:
- **Architecture overview**: High-level description of the system's structure
- **Key components**: List of important files/classes and their responsibilities
- **Data flow**: How data moves through the system (requests, transformations, storage)
- **Dependencies**: Internal and external dependency map
- **Patterns**: Design patterns, conventions, and idioms used in the codebase
- **Observations**: Anything notable — potential issues, tech debt, clever solutions
## Guidelines
- Be thorough but focused. If the user specified an area, concentrate there.
- Don't guess — read the actual code before making claims.
- Quote specific file paths and line numbers when referencing code.
- If you find something unexpected or concerning, flag it clearly.
When you have completed your exploration, call `finish_skill` with a brief summary of your findings.

View File

@@ -0,0 +1,9 @@
name: explore
description: Deep codebase exploration — traces paths, maps architecture, summarizes findings
version: "1.0"
triggers: ["/explore", "/ex"]
config_overrides:
temperature: 0.3
tools_disable: [write_file, make_dir, delete_file, str_replace, patch_apply, run_command]
chain: []
prompts: [prompt.md]

View File

@@ -0,0 +1,50 @@
# Plan Skill
You are in **planning mode**. Your goal is to break down a task into a clear, actionable implementation plan. The explore skill has already run (if chained), so you have codebase context.
## Process
1. **Define scope**: Clearly state what the plan covers and what it does not.
2. **Decompose**: Break the task into discrete, ordered steps. Each step should be:
- Small enough to implement in one focused session
- Clear enough that someone unfamiliar could follow it
- Testable — you can verify the step was done correctly
3. **Identify dependencies**: Note which steps depend on others and the critical path.
4. **Map to files**: For each step, list the specific files to create or modify.
5. **Flag risks**: Identify anything that could go wrong, require decisions, or block progress.
## Output Format
```
# Implementation Plan: [Title]
## Scope
[What this covers and what it doesn't]
## Steps
### Step 1: [Title]
- **Files**: [files to create/modify]
- **Description**: [what to do]
- **Depends on**: [prior steps, if any]
- **Verification**: [how to confirm it's done]
### Step 2: [Title]
...
## Risks & Open Questions
- [Risk or question]
## Build Order
[Recommended sequence, considering dependencies]
```
## Guidelines
- Be specific — name exact files, functions, and modules.
- Keep steps granular. "Implement the backend" is too vague. "Add the /api/users endpoint with GET and POST handlers" is good.
- Consider both happy path and error cases in your plan.
- If you need to make assumptions, state them explicitly.
- Use `run_command` if you need to check project state (e.g., installed packages, running services).
When the plan is complete and the user has approved it, call `finish_skill` with a one-line summary.

View File

@@ -0,0 +1,9 @@
name: plan
description: Break down tasks, create roadmaps, plan implementations
version: "1.0"
triggers: ["/plan"]
config_overrides:
temperature: 0.5
tools_disable: [write_file, make_dir, delete_file, str_replace, patch_apply]
chain: [explore]
prompts: [prompt.md]

View File

@@ -0,0 +1,47 @@
# Write Document Skill
You are in **document writing mode**. Your goal is to draft, edit, or improve written documents — READMEs, technical specs, changelogs, guides, or any prose content.
## Workflow
### 1. Understand the Document
- What type of document? (README, spec, changelog, tutorial, etc.)
- Who is the audience? (developers, users, stakeholders)
- What is the desired tone? (formal, casual, technical)
- Are there existing documents to reference or update?
### 2. Outline
Before writing, propose a structure:
- List the main sections
- Note what each section should cover
- Get user approval on the outline before drafting
### 3. Draft
Write the full document based on the approved outline:
- Use clear, concise language
- Follow Markdown formatting conventions
- Include code examples where appropriate
- Be specific — avoid vague statements
### 4. Revise
After the initial draft:
- Check for consistency in tone and terminology
- Verify technical accuracy by reading referenced code
- Ensure all sections from the outline are covered
- Trim unnecessary content
## Document Templates
**README**: Project name, description, installation, usage, configuration, contributing, license
**Technical Spec**: Context, goals, non-goals, design, alternatives considered, implementation plan
**Changelog**: Version, date, categories (Added, Changed, Fixed, Removed)
**Guide/Tutorial**: Prerequisites, step-by-step instructions, examples, troubleshooting
## Guidelines
- Read existing project docs and code to ensure accuracy.
- Match the existing documentation style if updating.
- Prefer concrete examples over abstract descriptions.
- Use the `write_file` tool to save the document when the user approves.
When the document is complete and saved, call `finish_skill` with a summary of what was written.

View File

@@ -0,0 +1,8 @@
name: write-document
description: Draft and edit documents — READMEs, specs, changelogs, prose
version: "1.0"
triggers: ["/write-doc", "/doc"]
config_overrides:
temperature: 0.7
chain: []
prompts: [prompt.md]

122
README.md
View File

@@ -26,31 +26,79 @@ pip install -e ".[dev]"
## Configuration
Edit `config/config.yaml` to configure the agent. Key settings:
Edit `config/config.yaml` to configure the agent. The full configuration reference:
```yaml
llm:
model: "qwen3.5:latest" # Ollama model name
endpoint: "http://localhost:11434" # Ollama endpoint
max_retries: 3 # Retry attempts on transient errors
retry_backoff_base: 1.0 # Exponential backoff base (seconds)
model: "qwen3.5:latest" # Ollama model name
endpoint: "http://localhost:11434" # Ollama endpoint
api_path: "/v1/chat/completions" # API endpoint path
temperature: 0.1 # Sampling temperature
max_tokens: 4096 # Maximum tokens in LLM response
timeout: 120 # Request timeout in seconds
max_retries: 3 # Retry attempts on transient errors
retry_backoff_base: 1.0 # Exponential backoff base (seconds)
retry_backoff_max: 30.0 # Maximum backoff seconds
agent:
max_iterations: 25 # Max tool-call iterations per turn
max_conversation_tokens: 32000 # Token budget for conversation
workspace_root: "." # Project directory for file operations
truncation_keep_recent: 10 # Messages preserved during truncation
truncation_threshold: 0.85 # Budget fraction that triggers truncation
max_iterations: 25 # Max tool-call iterations per turn
max_conversation_tokens: 32000 # Token budget for conversation
workspace_root: "." # Project directory for file operations
truncation_keep_recent: 10 # Messages preserved during truncation
truncation_threshold: 0.85 # Budget fraction that triggers truncation
session:
auto_save: true # Save session after each turn
max_session_age_hours: 72 # Auto-cleanup old sessions
offer_resume: true # Offer to resume on startup
session_dir: ".sneakycode/sessions" # Directory for session files
auto_save: true # Save session after each turn
max_session_age_hours: 72 # Auto-cleanup old sessions
offer_resume: true # Offer to resume on startup
permissions:
auto_approve: [read_file, list_dir, grep_files, find_files, finish]
prompt_user: [write_file, delete_file, run_command, str_replace, patch_apply, make_dir]
deny: []
tools:
shell:
allowed_commands: # Commands the LLM may run
- git
- python
- pip
- pytest
- ruff
- ls
- cat
- head
- tail
- wc
- diff
- grep
- find
- echo
denied_commands: # Blocked commands
- rm -rf /
- sudo
- curl
- wget
max_output_bytes: 65536 # Max captured output size (bytes)
filesystem:
max_file_size_bytes: 1048576 # 1 MB — max file size for read/write
binary_detection: true # Detect and reject binary files
display:
show_tool_calls: true # Show tool call details in output
show_token_usage: true # Show token usage stats
stream_output: true # Stream LLM output to terminal
skills:
enabled: true # Enable the skills system
directories: # Directories to scan for skill files
- ".sneakycode/skills"
debug:
enabled: false # Enable debug logging
log_dir: ".sneakycode/logs" # Debug log directory
max_files: 10 # Max debug log files to retain
```
Environment variable `SNEAKYCODE_CONFIG` can override the config file path.
@@ -58,9 +106,12 @@ Environment variable `SNEAKYCODE_CONFIG` can override the config file path.
## Usage
```bash
# Start the interactive REPL
# Start the interactive TUI
sneakycode
# Open a specific project directory
sneakycode /path/to/project
# Or run directly
python -m app.main
@@ -68,25 +119,38 @@ python -m app.main
sneakycode --config path/to/config.yaml --verbose --log-file sneakycode.log
```
### CLI Options
| Option | Description |
|--------------------------|--------------------------------------------------|
| `DIRECTORY` | Project directory to use as workspace root |
| `--config PATH` | Path to config YAML file (default: `config/config.yaml`) |
| `-v`, `--verbose` | Enable verbose (DEBUG) logging |
| `--log-file PATH` | Path to log file for persistent logging |
### REPL Commands
| Command | Description |
|------------|--------------------------------------|
| `/quit` | Save session and exit |
| `/history` | Show conversation history |
| `/clear` | Clear conversation history |
| `/save` | Manually save session |
| `/session` | Show session info (messages, tokens) |
| Command | Description |
|-------------------|----------------------------------------------------|
| `/help` | Show available commands |
| `/quit` | Save session and exit (also `/exit`, `/bye`) |
| `/history` | Show conversation history |
| `/clear` | Clear conversation history |
| `/save` | Manually save session |
| `/session` | Show session info (messages, tokens, start time) |
| `/models` | List available Ollama models |
| `/models <name>` | Switch to a different model |
| `/skills` | List available skills |
### Session Persistence
Sessions are automatically saved after each agent turn and on exit. On startup, SneakyCode offers to resume the most recent session for the current workspace.
Session files are stored in `.sneakycode/sessions/` within the workspace root.
Session files are stored in `.sneakycode/sessions/` within the workspace root (configurable via `session.session_dir`).
## Available Tools
SneakyCode provides 11 tools across 5 categories. See [docs/tools.md](docs/tools.md) for the full reference.
SneakyCode provides tools across 6 categories. See [docs/tools.md](docs/tools.md) for the full reference.
| Category | Tools | Permission |
|------------|-------------------------------------------------|---------------|
@@ -96,6 +160,17 @@ SneakyCode provides 11 tools across 5 categories. See [docs/tools.md](docs/tools
| Edit | `str_replace`, `patch_apply` | User confirm |
| Shell | `run_command` | User confirm |
| Control | `finish` | Auto-approved |
| Skills | `load_skill` | Auto-approved |
The `load_skill` tool is available when `skills.enabled` is `true` in the config. It allows the LLM to load skill instructions from the configured skill directories.
## Skills
SneakyCode includes a skills system that lets you provide reusable instruction sets to the LLM. Skills are markdown files placed in `.sneakycode/skills/` (or any directory listed in `skills.directories`).
Skills are auto-discovered on startup. The LLM can load them via the `load_skill` tool, and you can list available skills with the `/skills` command.
To create a skill, add a `.md` file to your skills directory with a descriptive filename (e.g., `refactoring.md`). The file content is injected into the conversation when the skill is loaded.
## Development
@@ -121,6 +196,7 @@ app/
├── models/ # Pydantic config and message schemas
├── services/ # LLM client, streaming, permissions, session persistence
├── tools/ # Tool implementations (one file per group)
├── ui/ # Textual TUI application and widgets
└── utils/ # Logging, display, file helpers, token counter
config/
└── config.yaml # Application configuration

View File

@@ -7,7 +7,7 @@ import time
from typing import TYPE_CHECKING, Any
from app.agent.context import SessionContext
from app.models.config import AppConfig
from app.models.config import AgentMode, AppConfig
from app.models.message import Message
from app.models.tool_call import ToolCall, ToolResult, ToolResultStatus
from app.services.llm import LLMClient, LLMConnectionError, LLMError, LLMStreamError
@@ -19,6 +19,7 @@ from app.utils.logging import get_logger
if TYPE_CHECKING:
from app.services.debug_log import DebugLogger
from app.services.skill_runner import SkillRunner
from app.services.skills import SkillsManager
logger = get_logger(__name__)
@@ -45,6 +46,7 @@ class AgentLoop:
display: DisplayAdapter | None = None,
debug_logger: DebugLogger | None = None,
skills_manager: SkillsManager | None = None,
skill_runner: SkillRunner | None = None,
) -> None:
self._config = config
self._ctx = ctx
@@ -55,7 +57,14 @@ class AgentLoop:
self._display = display
self._debug = debug_logger
self._skills = skills_manager
self._skill_runner = skill_runner
self._tools_schema = registry.get_openai_tools_schema()
if self._permissions.mode == AgentMode.PLAN:
read_only = PermissionsService.READ_ONLY_TOOLS
self._tools_schema = [
t for t in self._tools_schema
if t["function"]["name"] in read_only
]
self._system_prompt = self._build_system_prompt()
self._cancelled = False
@@ -81,12 +90,52 @@ class AgentLoop:
)
if self._skills:
prompt += self._skills.get_system_prompt_snippet()
if self._skill_runner and self._skill_runner.is_active:
prompt += (
f"\n\nCurrently active skill: {self._skill_runner.active_skill_name}. "
"When the skill's objective is complete, call the `finish_skill` tool."
)
if self._permissions.mode == AgentMode.PLAN:
prompt += (
"\n\nYou are in PLAN mode. You may only use read-only tools: "
"read_file, list_dir, grep_files, find_files, finish. "
"Do NOT attempt to write files, edit code, or run commands. "
"Instead, describe what changes you would make, which files "
"you would modify, and provide the reasoning for each change."
)
return prompt
# Models whose chat templates understand /no_think directives.
_THINKING_MODEL_PREFIXES = ("qwen", "qwq")
def _model_supports_no_think(self) -> bool:
"""Check if the current model uses a thinking chat template."""
model_lower = self._config.llm.model.lower()
return any(model_lower.startswith(p) for p in self._THINKING_MODEL_PREFIXES)
def _get_messages_with_system_prompt(self) -> list[Message]:
"""Prepend the system prompt to conversation history."""
"""Prepend the system prompt to conversation history.
When thinking is disabled on a model that supports it, appends a
system-level /no_think directive after the last user message so
Qwen 3.x (and similar) chat templates see it.
"""
system_msg = Message(role="system", content=self._system_prompt)
return [system_msg] + self._ctx.get_history()
history = self._ctx.get_history()
if not self._config.llm.thinking and self._model_supports_no_think() and history:
history = list(history)
# Find last user message and insert a system hint after it
for i in range(len(history) - 1, -1, -1):
if history[i].role == "user":
no_think_msg = Message(
role="system",
content="/no_think",
)
history.insert(i + 1, no_think_msg)
break
return [system_msg] + history
async def run_turn(self, user_input: str) -> None:
"""Execute one full agent turn: add user message, loop until done.
@@ -99,6 +148,7 @@ class AgentLoop:
max_iter = self._config.agent.max_iterations
reasoning_only_streak = 0
empty_streak = 0
for iteration in range(1, max_iter + 1):
if self._cancelled:
if self._display:
@@ -153,16 +203,32 @@ class AgentLoop:
reasoning_only_streak += 1
self._ctx.pop_last_message()
if reasoning_only_streak >= _MAX_REASONING_RETRIES:
# Nudge the model by injecting a user hint
if self._display:
# When thinking is disabled, reasoning-only is expected model noise.
# Nudge immediately and silently to avoid wasting iterations.
thinking_disabled = not self._config.llm.thinking
# If the last context messages are tool errors, nudge immediately
# rather than wasting retries — the model is likely confused by the error.
has_recent_tool_error = any(
m.role == "tool" and m.content and m.content.startswith("Unknown ")
for m in self._ctx.get_history()[-3:]
)
should_nudge = (
thinking_disabled
or has_recent_tool_error
or reasoning_only_streak >= _MAX_REASONING_RETRIES
)
if should_nudge:
if not thinking_disabled and self._display:
self._display.write_warning(
f"Model produced reasoning but no response {reasoning_only_streak} times. "
"Nudging model to respond..."
)
self._ctx.add_message(
"user",
"Please respond with your answer. Do not just think — provide your actual response.",
"Please respond with your answer. If a tool call failed, briefly explain what happened and continue.",
)
reasoning_only_streak = 0
else:
@@ -173,10 +239,42 @@ class AgentLoop:
# Successful response — reset streak
reasoning_only_streak = 0
# Detect completely empty response (no content, no tool calls)
if not assistant_msg.content and not assistant_msg.tool_calls:
empty_streak += 1
self._ctx.pop_last_message() # Don't keep empty messages
if empty_streak >= 2:
if self._display:
self._display.write_warning(
"Model returned repeated empty responses — "
"try a different model or check Ollama logs."
)
break
if self._display:
self._display.write_warning("Model returned empty response. Retrying without tools...")
# Retry without tool schemas — some models return empty when
# tools are in the payload but the model can't handle them.
assistant_msg = await self._llm_step(skip_tools=True)
if assistant_msg is None:
break
if assistant_msg.content:
self._ctx.add_message("assistant", assistant_msg.content)
if self._display:
self._display.write_assistant_message(assistant_msg.content)
self._handler.reset()
break
# Still empty even without tools
self._handler.reset()
continue
empty_streak = 0 # reset on successful non-empty response
# Display any assistant text content (even if tool calls follow)
if self._display and assistant_msg.content:
self._display.write_assistant_message(assistant_msg.content)
# No tool calls → task complete (plain text response)
if not assistant_msg.tool_calls:
if self._display and assistant_msg.content:
self._display.write_assistant_message(assistant_msg.content)
break
# Execute tool calls
@@ -192,28 +290,37 @@ class AgentLoop:
name=result.tool_name,
)
# Check if finish tool was called
# Rebuild tools schema and system prompt if skill state may have changed
if any(r.tool_name in ("load_skill", "finish_skill") for r in results):
self._tools_schema = self._registry.get_openai_tools_schema()
self._system_prompt = self._build_system_prompt()
# Check if finish tool was called (finish_skill does NOT break the loop)
if any(r.tool_name == "finish" for r in results):
break
else:
if self._display:
self._display.write_warning(f"Agent reached maximum iterations ({max_iter}). Stopping.")
async def _llm_step(self) -> Message | None:
async def _llm_step(self, *, skip_tools: bool = False) -> Message | None:
"""Stream one LLM response and return the accumulated Message.
Uses retry-enabled streaming. On mid-stream errors, attempts to recover
partial content if available.
Args:
skip_tools: If True, send the request without tool schemas (fallback mode).
Returns:
The assistant Message, or None if an error occurred.
"""
messages = self._get_messages_with_system_prompt()
if self._debug:
self._debug.log_request(messages, self._config.llm.model)
tools = None if skip_tools else self._tools_schema
t0 = time.monotonic()
try:
chunk_iter = self._client.stream_chat_with_retry(messages, tools=self._tools_schema)
chunk_iter = self._client.stream_chat_with_retry(messages, tools=tools)
result = await self._handler.process_stream(chunk_iter)
if result and self._debug:
elapsed = (time.monotonic() - t0) * 1000

View File

@@ -1,12 +1,39 @@
"""Pydantic configuration models mapping to config/config.yaml."""
import os
from enum import StrEnum
from pathlib import Path
from typing import Any
import yaml
from pydantic import BaseModel, Field, model_validator
class AgentMode(StrEnum):
"""Runtime agent mode controlling permission behavior."""
NORMAL = "normal"
PLAN = "plan"
AUTO = "auto"
class ModelProfile(BaseModel):
"""Per-model overrides applied when switching models."""
max_conversation_tokens: int | None = Field(
default=None, description="Token budget override for this model's context window"
)
thinking: bool | None = Field(
default=None, description="Override thinking mode for this model"
)
temperature: float | None = Field(
default=None, description="Override sampling temperature"
)
max_tokens: int | None = Field(
default=None, description="Override max response tokens"
)
class LLMConfig(BaseModel):
"""LLM backend configuration."""
@@ -19,6 +46,14 @@ class LLMConfig(BaseModel):
max_retries: int = Field(default=3, description="Max retry attempts on transient errors")
retry_backoff_base: float = Field(default=1.0, description="Base seconds for exponential backoff")
retry_backoff_max: float = Field(default=30.0, description="Maximum backoff seconds")
thinking: bool = Field(
default=True,
description="Enable model thinking/reasoning mode (disable to reduce reasoning-only loops)",
)
extra_body: dict[str, Any] = Field(
default_factory=dict,
description="Extra parameters merged into the API request body (model-specific)",
)
class AgentConfig(BaseModel):
@@ -55,11 +90,19 @@ class ShellToolConfig(BaseModel):
max_output_bytes: int = Field(default=65536, description="Max output capture size in bytes")
class FileCacheConfig(BaseModel):
"""File cache configuration."""
enabled: bool = Field(default=True, description="Enable file content caching")
max_entries: int = Field(default=128, description="Maximum cached file entries (LRU eviction)")
class FilesystemToolConfig(BaseModel):
"""Filesystem tool limits."""
max_file_size_bytes: int = Field(default=1_048_576, description="Max file size for read/write")
binary_detection: bool = Field(default=True, description="Detect and reject binary files")
cache: FileCacheConfig = Field(default_factory=FileCacheConfig, description="File cache settings")
class ToolsConfig(BaseModel):
@@ -119,6 +162,10 @@ class AppConfig(BaseModel):
session: SessionConfig = Field(default_factory=SessionConfig)
debug: DebugConfig = Field(default_factory=DebugConfig)
skills: SkillsConfig = Field(default_factory=SkillsConfig)
model_profiles: dict[str, ModelProfile] = Field(
default_factory=dict,
description="Per-model overrides keyed by model name prefix",
)
@model_validator(mode="after")
def resolve_workspace_root(self) -> "AppConfig":
@@ -126,6 +173,39 @@ class AppConfig(BaseModel):
self.agent.workspace_root = self.agent.workspace_root.resolve()
return self
def get_model_profile(self, model: str) -> ModelProfile | None:
"""Find the best matching model profile by prefix.
Matches the longest prefix first (e.g., "llama3.1" beats "llama3"
for model "llama3.1:latest"). Returns None if no profile matches.
"""
model_lower = model.lower().split(":")[0] # strip tag
best_match: str | None = None
for key in self.model_profiles:
key_lower = key.lower()
if model_lower == key_lower or model_lower.startswith(key_lower):
if best_match is None or len(key) > len(best_match):
best_match = key
return self.model_profiles.get(best_match) if best_match else None
def apply_model_profile(self, model: str) -> ModelProfile | None:
"""Apply the matching model profile overrides to the active config.
Returns the applied profile, or None if no profile matched.
"""
profile = self.get_model_profile(model)
if profile is None:
return None
if profile.max_conversation_tokens is not None:
self.agent.max_conversation_tokens = profile.max_conversation_tokens
if profile.thinking is not None:
self.llm.thinking = profile.thinking
if profile.temperature is not None:
self.llm.temperature = profile.temperature
if profile.max_tokens is not None:
self.llm.max_tokens = profile.max_tokens
return profile
# Default config file location relative to project root
_DEFAULT_CONFIG_PATH = Path("config/config.yaml")

39
app/models/skill.py Normal file
View File

@@ -0,0 +1,39 @@
"""Pydantic models for structured skill packages."""
from __future__ import annotations
from pydantic import BaseModel, Field
class SkillConfigOverrides(BaseModel):
"""Scoped config overrides applied while a skill is active."""
temperature: float | None = Field(default=None, description="Override sampling temperature")
max_tokens: int | None = Field(default=None, description="Override max tokens")
tools_enable: list[str] | None = Field(
default=None, description="Whitelist — only these tools available when set"
)
tools_disable: list[str] | None = Field(
default=None, description="Blacklist — disable specific tools"
)
class SkillManifest(BaseModel):
"""Parsed skill.yaml manifest for a skill package directory."""
name: str = Field(description="Unique skill identifier")
description: str = Field(description="Human-readable skill description")
version: str = Field(default="1.0", description="Skill version")
triggers: list[str] = Field(
default_factory=list, description="Slash commands that activate this skill"
)
config_overrides: SkillConfigOverrides = Field(
default_factory=SkillConfigOverrides, description="Scoped config overrides"
)
chain: list[str] = Field(
default_factory=list, description="Skill names to run first (dependencies)"
)
prompts: list[str] = Field(
default_factory=lambda: ["prompt.md"],
description="Markdown prompt files to load, in order",
)

View File

@@ -151,6 +151,15 @@ class LLMClient:
if tools:
payload["tools"] = tools
# When thinking is disabled, inject chat_template_kwargs for backends
# that support it (Qwen 3.x thinking models).
if not self._config.thinking and self._config.model.lower().startswith(("qwen", "qwq")):
payload.setdefault("chat_template_kwargs", {})["enable_thinking"] = False
# Merge model-specific extra parameters (e.g., reasoning_effort)
if self._config.extra_body:
payload.update(self._config.extra_body)
try:
async with self._client.stream(
"POST", self._config.api_path, json=payload
@@ -162,20 +171,32 @@ class LLMClient:
status_code=response.status_code,
)
chunk_count = 0
async for line in response.aiter_lines():
if not line.startswith("data: "):
line = line.strip()
if not line:
continue
data = line[6:] # strip "data: " prefix
if data.strip() == "[DONE]":
return
# SSE format: "data: {json}" or "data: [DONE]"
if line.startswith("data: "):
data = line[6:]
if data.strip() == "[DONE]":
break
elif line.startswith("{"):
# Plain NDJSON fallback (some Ollama versions)
data = line
else:
continue
try:
yield json.loads(data)
chunk_count += 1
except json.JSONDecodeError:
logger.warning("malformed_sse_chunk", data=data[:200])
if chunk_count == 0:
logger.warning("empty_stream", model=self._config.model)
except httpx.ConnectError as e:
raise LLMConnectionError(f"Cannot connect to LLM endpoint: {e}") from e
except httpx.TimeoutException as e:

View File

@@ -4,16 +4,20 @@ from __future__ import annotations
import json
import logging
import re
import shlex
from collections.abc import Awaitable, Callable
from app.models.config import PermissionsConfig, ToolsConfig
from app.models.config import AgentMode, PermissionsConfig, ToolsConfig
logger = logging.getLogger(__name__)
# Type alias for the async prompt callback
PromptCallback = Callable[[str, str], Awaitable[bool]]
# Detect shell redirects that write to files (>, >>, heredocs)
_WRITE_REDIRECT_PATTERN = re.compile(r"(?:>\s*\S|>>|<<)")
class PermissionDenied(Exception):
"""Raised when a tool is denied execution by permissions policy."""
@@ -26,6 +30,10 @@ class PermissionsService:
shows a modal dialog. Without a callback, unlisted tools are denied.
"""
READ_ONLY_TOOLS: frozenset[str] = frozenset({
"read_file", "list_dir", "grep_files", "find_files", "finish",
})
def __init__(
self,
config: PermissionsConfig,
@@ -34,6 +42,16 @@ class PermissionsService:
self.config = config
self._tools_config = tools_config
self._prompt_callback: PromptCallback | None = None
self._mode: AgentMode = AgentMode.NORMAL
@property
def mode(self) -> AgentMode:
"""Current agent mode."""
return self._mode
@mode.setter
def mode(self, value: AgentMode) -> None:
self._mode = value
def set_prompt_callback(self, callback: PromptCallback) -> None:
"""Set the async callback used to prompt the user for permission.
@@ -63,6 +81,16 @@ class PermissionsService:
logger.info("Tool '%s' is in deny list — blocked", tool_name)
return False
if self._mode == AgentMode.AUTO:
logger.debug("Tool '%s' auto-approved (AUTO mode)", tool_name)
return True
if self._mode == AgentMode.PLAN:
if tool_name not in self.READ_ONLY_TOOLS:
logger.info("Tool '%s' blocked in Plan mode (read-only tools only)", tool_name)
return False
return True
if tool_name in self.config.auto_approve:
logger.debug("Tool '%s' is auto-approved", tool_name)
return True
@@ -104,6 +132,11 @@ class PermissionsService:
logger.info("Shell command '%s' matches denied prefix '%s'", cmd, denied)
return False
# Detect shell redirects that write to files — require approval
if _WRITE_REDIRECT_PATTERN.search(cmd):
logger.info("Shell command '%s' contains file-write redirect — requiring approval", cmd)
return None # fall through to user prompt
# Allowed commands: base executable match
if shell_config.allowed_commands:
if base_cmd in shell_config.allowed_commands:

View File

@@ -52,6 +52,10 @@ class SessionManager:
self._session_dir = workspace_root / config.session_dir
self._session_id = f"{self._workspace_hash}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
def update_model(self, model: str) -> None:
"""Update the model name for session metadata."""
self._model = model
def save(self, ctx: "SessionContext") -> Path:
"""Save session state to a JSON file via atomic write.

View File

@@ -0,0 +1,234 @@
"""SkillRunner — orchestrates skill activation, chaining, config scoping, and deactivation."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from app.agent.context import SessionContext
from app.models.config import AppConfig
from app.models.skill import SkillManifest
from app.services.skills import Skill, SkillsManager
from app.tools.registry import ToolRegistry
logger = logging.getLogger(__name__)
class SkillChainError(Exception):
"""Raised when skill chain resolution fails (e.g., cycle detected)."""
@dataclass
class _SkillSnapshot:
"""Captured state before skill activation, for restoration on deactivate."""
temperature: float
max_tokens: int
disabled_tools: set[str] = field(default_factory=set)
class SkillRunner:
"""Manages skill lifecycle: activation, chaining, config overrides, deactivation.
Only one skill can be active at a time. Activating a new skill while one
is active will first deactivate the current skill.
"""
def __init__(
self,
skills_manager: SkillsManager,
config: AppConfig,
ctx: SessionContext,
registry: ToolRegistry,
) -> None:
self._skills = skills_manager
self._config = config
self._ctx = ctx
self._registry = registry
self._active_skill: Skill | None = None
self._snapshot: _SkillSnapshot | None = None
@property
def is_active(self) -> bool:
"""Whether a skill is currently active."""
return self._active_skill is not None
@property
def active_skill_name(self) -> str | None:
"""Name of the currently active skill, or None."""
return self._active_skill.name if self._active_skill else None
@property
def active_skill(self) -> Skill | None:
"""The currently active skill, or None."""
return self._active_skill
def activate(self, skill_name: str) -> str | None:
"""Activate a skill by name.
Resolves chain dependencies (depth-first), applies config overrides,
injects prompt content into conversation context.
Args:
skill_name: Name of the skill to activate.
Returns:
The concatenated prompt content injected, or None on failure.
Raises:
SkillChainError: If chain resolution detects a cycle.
"""
skill = self._skills.get_skill(skill_name)
if skill is None:
logger.warning("Cannot activate unknown skill: %s", skill_name)
return None
# Deactivate current skill if one is active
if self._active_skill is not None:
self.deactivate()
# Resolve chain dependencies
chain = self._resolve_chain(skill, set())
# Snapshot current config for restoration
self._snapshot = _SkillSnapshot(
temperature=self._config.llm.temperature,
max_tokens=self._config.llm.max_tokens,
)
# Collect and inject chain skill prompts first
all_prompts: list[str] = []
for chained_skill in chain:
content = self._skills.load_skill(chained_skill.name)
if content:
all_prompts.append(f"[Chained skill: {chained_skill.name}]\n{content}")
# Load the target skill's prompts
content = self._skills.load_skill(skill.name)
if content:
all_prompts.append(content)
# Apply config overrides from the target skill
if skill.manifest:
self._apply_overrides(skill.manifest)
# Inject prompts into context
full_prompt = "\n\n".join(all_prompts) if all_prompts else None
if full_prompt:
self._ctx.add_message(
"system",
f"[Skill activated: {skill.name}]\n{full_prompt}",
)
self._active_skill = skill
logger.info("Skill activated: %s", skill.name)
return full_prompt
def activate_by_trigger(self, trigger: str) -> str | None:
"""Activate a skill by its /command trigger.
Args:
trigger: The trigger string (with or without leading /).
Returns:
The concatenated prompt content, or None if no skill matches.
"""
skill = self._skills.get_skill_by_trigger(trigger)
if skill is None:
return None
return self.activate(skill.name)
def deactivate(self, summary: str | None = None) -> None:
"""Deactivate the current skill, restoring config and tool state.
Args:
summary: Optional summary message to inject into context.
"""
if self._active_skill is None:
return
skill_name = self._active_skill.name
# Restore config
if self._snapshot is not None:
self._config.llm.temperature = self._snapshot.temperature
self._config.llm.max_tokens = self._snapshot.max_tokens
self._registry.restore_filter(self._snapshot.disabled_tools)
self._snapshot = None
if summary:
self._ctx.add_message(
"system",
f"[Skill completed: {skill_name}] {summary}",
)
self._active_skill = None
logger.info("Skill deactivated: %s", skill_name)
def _resolve_chain(
self, skill: Skill, in_progress: set[str], completed: set[str] | None = None,
) -> list[Skill]:
"""Depth-first resolution of skill chain dependencies.
Uses separate in_progress (current path) and completed sets to correctly
handle diamond dependencies without false cycle detection.
Args:
skill: The skill whose chain to resolve.
in_progress: Skills on the current recursion path (for cycle detection).
completed: Skills already fully resolved (skip duplicates).
Returns:
Ordered list of chained skills to activate before the target.
Raises:
SkillChainError: If a cycle is detected.
"""
if completed is None:
completed = set()
if skill.manifest is None or not skill.manifest.chain:
return []
result: list[Skill] = []
for dep_name in skill.manifest.chain:
if dep_name in completed:
continue # Already resolved via another branch (diamond dep)
if dep_name in in_progress:
raise SkillChainError(
f"Cycle detected in skill chain: {dep_name} already in progress "
f"(path: {' -> '.join(in_progress)} -> {dep_name})"
)
dep_skill = self._skills.get_skill(dep_name)
if dep_skill is None:
logger.warning("Chained skill not found: %s (required by %s)", dep_name, skill.name)
continue
in_progress.add(dep_name)
result.extend(self._resolve_chain(dep_skill, in_progress, completed))
in_progress.discard(dep_name)
completed.add(dep_name)
result.append(dep_skill)
return result
def _apply_overrides(self, manifest: SkillManifest) -> None:
"""Apply config overrides from a skill manifest."""
overrides = manifest.config_overrides
if overrides.temperature is not None:
self._config.llm.temperature = overrides.temperature
if overrides.max_tokens is not None:
self._config.llm.max_tokens = overrides.max_tokens
if overrides.tools_enable is not None or overrides.tools_disable is not None:
previous = self._registry.apply_filter(
enable=overrides.tools_enable,
disable=overrides.tools_disable,
)
# Store for restoration
if self._snapshot:
self._snapshot.disabled_tools = previous

View File

@@ -1,46 +1,92 @@
"""Skills manager — scans for and loads skill markdown files."""
"""Skills manager — scans for and loads skill packages and legacy markdown files."""
from __future__ import annotations
import logging
from pathlib import Path
from pydantic import BaseModel
import yaml
from pydantic import BaseModel, ValidationError
from app.models.config import SkillsConfig
from app.models.skill import SkillManifest
logger = logging.getLogger(__name__)
class Skill(BaseModel):
"""Metadata for a discovered skill file."""
"""Metadata for a discovered skill (package or legacy flat file)."""
name: str
description: str
path: Path
manifest: SkillManifest | None = None
class SkillsManager:
"""Discovers, indexes, and loads skill files from configured directories."""
"""Discovers, indexes, and loads skill files from configured directories.
Supports both:
- Directory-based packages (contain skill.yaml + prompt .md files)
- Legacy flat .md files (backwards compatible)
"""
def __init__(self, config: SkillsConfig, workspace_root: Path) -> None:
self._config = config
self._workspace = workspace_root
self._skills: dict[str, Skill] = {}
self._trigger_map: dict[str, str] = {} # trigger -> skill name
self._scan()
def _scan(self) -> None:
"""Scan configured directories for .md skill files."""
"""Scan configured directories for skill packages and legacy .md files."""
for skill_dir in self._config.directories:
resolved = (self._workspace / skill_dir) if not skill_dir.is_absolute() else skill_dir
if not resolved.is_dir():
logger.debug("Skills directory does not exist: %s", resolved)
continue
for md in sorted(resolved.glob("*.md")):
name = md.stem
desc = self._extract_description(md)
self._skills[name] = Skill(name=name, description=desc, path=md)
logger.debug("Discovered skill: %s (%s)", name, desc)
for entry in sorted(resolved.iterdir()):
if entry.is_dir():
self._scan_package(entry)
elif entry.suffix == ".md":
self._scan_legacy(entry)
def _scan_package(self, pkg_dir: Path) -> None:
"""Scan a directory-based skill package containing skill.yaml."""
manifest_path = pkg_dir / "skill.yaml"
if not manifest_path.exists():
logger.debug("Skipping directory without skill.yaml: %s", pkg_dir)
return
try:
raw = yaml.safe_load(manifest_path.read_text())
manifest = SkillManifest(**raw)
except (yaml.YAMLError, ValidationError, TypeError) as e:
logger.warning("Failed to parse skill manifest %s: %s", manifest_path, e)
return
skill = Skill(
name=manifest.name,
description=manifest.description,
path=pkg_dir,
manifest=manifest,
)
self._skills[manifest.name] = skill
# Register triggers
for trigger in manifest.triggers:
normalized = trigger.lstrip("/").lower()
self._trigger_map[normalized] = manifest.name
logger.debug("Discovered skill package: %s (%s)", manifest.name, manifest.description)
def _scan_legacy(self, md_path: Path) -> None:
"""Scan a legacy flat .md skill file."""
name = md_path.stem
desc = self._extract_description(md_path)
self._skills[name] = Skill(name=name, description=desc, path=md_path)
logger.debug("Discovered legacy skill: %s (%s)", name, desc)
@staticmethod
def _extract_description(path: Path) -> str:
@@ -55,10 +101,51 @@ class SkillsManager:
"""Return all discovered skills."""
return list(self._skills.values())
def get_skill(self, name: str) -> Skill | None:
"""Look up a skill by name."""
return self._skills.get(name)
def get_skill_by_trigger(self, trigger: str) -> Skill | None:
"""Look up a skill by /command trigger.
Args:
trigger: The trigger string (with or without leading /).
Returns:
The matching Skill, or None.
"""
normalized = trigger.lstrip("/").lower()
skill_name = self._trigger_map.get(normalized)
if skill_name:
return self._skills.get(skill_name)
return None
def load_skill(self, name: str) -> str | None:
"""Load the full content of a skill by name. Returns None if not found."""
"""Load the full content of a skill by name.
For package skills, concatenates all prompt .md files.
For legacy skills, returns the .md file content.
Returns:
Concatenated prompt content, or None if not found.
"""
skill = self._skills.get(name)
return skill.path.read_text() if skill else None
if skill is None:
return None
if skill.manifest is not None:
# Package skill: load prompt files
parts: list[str] = []
for prompt_file in skill.manifest.prompts:
prompt_path = skill.path / prompt_file
if prompt_path.exists():
parts.append(prompt_path.read_text())
else:
logger.warning("Prompt file not found: %s", prompt_path)
return "\n\n".join(parts) if parts else None
else:
# Legacy flat file
return skill.path.read_text()
def get_system_prompt_snippet(self) -> str:
"""Generate a snippet for the system prompt listing available skills."""
@@ -66,6 +153,10 @@ class SkillsManager:
return ""
lines = ["\nAvailable skills (invoke with /skill-name):"]
for s in self._skills.values():
lines.append(f" - /{s.name}: {s.description}")
if s.manifest and s.manifest.triggers:
trigger_str = ", ".join(s.manifest.triggers)
lines.append(f" - {trigger_str}: {s.description}")
else:
lines.append(f" - /{s.name}: {s.description}")
lines.append("To use a skill's full instructions, call the load_skill tool.")
return "\n".join(lines)

View File

@@ -60,8 +60,10 @@ class StreamHandler:
"""
thinking_notified = False
last_update_time = 0.0
chunk_count = 0
async for chunk in chunk_iter:
chunk_count += 1
self._process_chunk(chunk)
if not self._display_config.stream_output:
@@ -96,6 +98,14 @@ class StreamHandler:
self._on_done()
tool_calls = self._build_tool_calls() or None
if chunk_count > 0 and not self._accumulated_content and not tool_calls:
logger.debug(
"stream_empty_result",
chunks_received=chunk_count,
had_reasoning=bool(self._accumulated_reasoning),
)
return Message(
role="assistant",
content=self._accumulated_content or None,
@@ -183,11 +193,8 @@ class StreamHandler:
return bool(self._accumulated_reasoning) and not self._accumulated_content and not self._tool_calls
def reset(self) -> None:
"""Clear all accumulators for the next turn."""
"""Clear accumulators for the next LLM call, preserving UI callbacks."""
self._accumulated_content = ""
self._accumulated_reasoning = ""
self._tool_calls.clear()
self._usage = None
self._on_content = None
self._on_thinking = None
self._on_done = None

View File

@@ -1,13 +1,17 @@
"""Edit tools: str_replace and patch_apply."""
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from app.models.config import AppConfig
from app.models.tool_call import ToolResult, ToolResultStatus
from app.tools.base import BaseTool
from app.utils.file_cache import FileCache, cached_read_file
from app.utils.file_helpers import (
FileSizeError,
PathSecurityError,
@@ -37,6 +41,12 @@ class StrReplaceTool(BaseTool):
)
params_model = StrReplaceParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(
self, *, tool_call_id: str, file_path: str, old_str: str, new_str: str, **kwargs: Any
) -> ToolResult:
@@ -44,11 +54,12 @@ class StrReplaceTool(BaseTool):
# Read the file
try:
content = safe_read_file(
content = cached_read_file(
file_path,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
check_binary=fs_config.binary_detection,
cache=self._file_cache,
)
except PathSecurityError as exc:
return ToolResult(
@@ -117,8 +128,14 @@ class StrReplaceTool(BaseTool):
safe_path = resolve_safe_path(file_path, self.workspace_root)
rel_path = safe_path.relative_to(self.workspace_root)
except (PathSecurityError, ValueError):
safe_path = None
rel_path = Path(file_path)
# Pre-warm cache with the new content (we already have it in memory).
if self._file_cache is not None and safe_path is not None:
self._file_cache.invalidate(safe_path)
self._file_cache.put(safe_path, new_content)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
@@ -144,6 +161,12 @@ class PatchApplyTool(BaseTool):
)
params_model = PatchApplyParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_path: str, patch: str, **kwargs: Any) -> ToolResult:
try:
safe_path = resolve_safe_path(file_path, self.workspace_root)
@@ -195,6 +218,9 @@ class PatchApplyTool(BaseTool):
error=f"Patch failed (exit {result.returncode}): {result.stderr or result.stdout}",
)
if self._file_cache is not None:
self._file_cache.invalidate(safe_path)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:

View File

@@ -1,12 +1,16 @@
"""Filesystem tools: read_file, list_dir, write_file, make_dir, delete_file."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from app.models.config import AppConfig
from app.models.tool_call import ToolResult, ToolResultStatus
from app.tools.base import BaseTool
from app.utils.file_cache import FileCache, cached_read_file
from app.utils.file_helpers import (
BinaryFileError,
FileSizeError,
@@ -23,6 +27,12 @@ class ReadFileParams(BaseModel):
file_path: str = Field(description="Path to the file to read (relative to workspace root)")
class ReadManyFilesParams(BaseModel):
"""Parameters for the read_many_files tool."""
file_paths: list[str] = Field(description="List of file paths to read (relative to workspace root)")
class ReadFileTool(BaseTool):
"""Read the contents of a file within the workspace."""
@@ -30,14 +40,22 @@ class ReadFileTool(BaseTool):
description = "Read the full contents of a text file. Returns the file content as a string."
params_model = ReadFileParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_path: str, **kwargs: Any) -> ToolResult:
fs_config = self.config.tools.filesystem
hits_before = self._file_cache.stats.hits if self._file_cache else 0
try:
content = safe_read_file(
content = cached_read_file(
file_path,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
check_binary=fs_config.binary_detection,
cache=self._file_cache,
)
except PathSecurityError as exc:
return ToolResult(
@@ -47,11 +65,12 @@ class ReadFileTool(BaseTool):
error=str(exc),
)
except FileNotFoundError as exc:
filename = Path(file_path).name
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
error=f"{exc}. Use find_files to locate it, e.g. find_files(pattern=\"{filename}\")",
)
except FileSizeError as exc:
return ToolResult(
@@ -68,6 +87,23 @@ class ReadFileTool(BaseTool):
error=str(exc),
)
# On cache hit the file is unchanged — its content is already in
# conversation context from the earlier read, so avoid resending it.
was_cache_hit = (
self._file_cache is not None
and self._file_cache.stats.hits > hits_before
)
if was_cache_hit:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=(
f"[Cached] {file_path} is unchanged since last read "
f"({len(content):,} chars). Content is already in conversation context."
),
)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
@@ -76,6 +112,76 @@ class ReadFileTool(BaseTool):
)
class ReadManyFilesTool(BaseTool):
"""Read contents of multiple files at once."""
name = "read_many_files"
description = (
"Read contents of multiple files at once. Returns each file's content "
"prefixed with its path header."
)
params_model = ReadManyFilesParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_paths: list[str], **kwargs: Any) -> ToolResult:
if not file_paths:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error="file_paths list is empty",
)
fs_config = self.config.tools.filesystem
sections: list[str] = []
success_count = 0
for fp in file_paths:
hits_before = self._file_cache.stats.hits if self._file_cache else 0
try:
content = cached_read_file(
fp,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
check_binary=fs_config.binary_detection,
cache=self._file_cache,
)
was_hit = (
self._file_cache is not None
and self._file_cache.stats.hits > hits_before
)
if was_hit:
sections.append(
f"=== {fp} ===\n[Cached] Unchanged since last read "
f"({len(content):,} chars). Already in conversation context."
)
else:
sections.append(f"=== {fp} ===\n{content}")
success_count += 1
except (PathSecurityError, FileNotFoundError, FileSizeError, BinaryFileError) as exc:
sections.append(f"=== {fp} ===\n[ERROR] {exc}")
if success_count == 0:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error="All files failed to read:\n" + "\n".join(sections),
)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output="\n".join(sections),
)
class ListDirParams(BaseModel):
"""Parameters for the list_dir tool."""
@@ -167,6 +273,12 @@ class WriteFileTool(BaseTool):
)
params_model = WriteFileParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_path: str, content: str, **kwargs: Any) -> ToolResult:
fs_config = self.config.tools.filesystem
try:
@@ -191,6 +303,9 @@ class WriteFileTool(BaseTool):
error=str(exc),
)
if self._file_cache is not None:
self._file_cache.invalidate(safe_path)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:
@@ -272,6 +387,12 @@ class DeleteFileTool(BaseTool):
description = "Delete a single file. Does not delete directories."
params_model = DeleteFileParams
def __init__(
self, workspace_root: Path, config: AppConfig, file_cache: FileCache | None = None
) -> None:
super().__init__(workspace_root, config)
self._file_cache = file_cache
def execute(self, *, tool_call_id: str, file_path: str, **kwargs: Any) -> ToolResult:
try:
safe_path = resolve_safe_path(file_path, self.workspace_root)
@@ -309,6 +430,9 @@ class DeleteFileTool(BaseTool):
error=f"Failed to delete file: {exc}",
)
if self._file_cache is not None:
self._file_cache.invalidate(safe_path)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:

View File

@@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any
from app.models.config import AppConfig
from app.tools.base import BaseTool
from app.utils.file_cache import FileCache
if TYPE_CHECKING:
from app.services.skills import SkillsManager
@@ -20,6 +21,7 @@ class ToolRegistry:
def __init__(self) -> None:
self._tools: dict[str, BaseTool] = {}
self._disabled: set[str] = set()
def register(self, tool: BaseTool) -> None:
"""Register a tool instance. Raises ValueError on duplicate name."""
@@ -29,26 +31,78 @@ class ToolRegistry:
logger.debug("Registered tool: %s", tool.name)
def get(self, name: str) -> BaseTool | None:
"""Look up a tool by name."""
"""Look up a tool by name. Returns None if disabled or not found."""
if name in self._disabled:
return None
return self._tools.get(name)
def get_all(self) -> dict[str, BaseTool]:
"""Return all registered tools."""
return dict(self._tools)
"""Return all registered tools (excluding disabled)."""
return {k: v for k, v in self._tools.items() if k not in self._disabled}
def get_openai_tools_schema(self) -> list[dict[str, Any]]:
"""Return OpenAI function-calling schemas for all registered tools."""
return [tool.get_openai_schema() for tool in self._tools.values()]
"""Return OpenAI function-calling schemas for all active tools."""
return [
tool.get_openai_schema()
for tool in self._tools.values()
if tool.name not in self._disabled
]
def apply_filter(
self,
*,
enable: list[str] | None = None,
disable: list[str] | None = None,
) -> set[str]:
"""Apply a tool filter, returning the previous disabled set for restoration.
Args:
enable: If set, only these tools (plus always-on tools) are available.
disable: Specific tools to disable.
Returns:
The previous disabled set (snapshot for restore).
"""
previous = set(self._disabled)
if enable is not None:
# Whitelist mode: disable everything not in the enable list
self._disabled = {name for name in self._tools if name not in enable}
elif disable is not None:
# Blacklist mode: add to existing disabled set (preserves global disables)
self._disabled = set(self._disabled) | set(disable)
else:
self._disabled = set()
return previous
def restore_filter(self, previous: set[str]) -> None:
"""Restore a previous filter state."""
self._disabled = previous
def all_tool_names(self) -> list[str]:
"""Return all registered tool names (including disabled)."""
return list(self._tools.keys())
def create_default_registry(
workspace_root: Path,
config: AppConfig,
skills_manager: SkillsManager | None = None,
skill_runner: object | None = None,
file_cache: FileCache | None = None,
) -> ToolRegistry:
"""Create a ToolRegistry populated with all built-in tools."""
"""Create a ToolRegistry populated with all built-in tools.
Args:
workspace_root: Workspace root path.
config: Application configuration.
skills_manager: Optional skills manager for skill tools.
skill_runner: Optional SkillRunner for package skill activation.
file_cache: Optional file cache shared across file-reading tools.
"""
# Read tools
from app.tools.filesystem import ListDirTool, ReadFileTool
from app.tools.filesystem import ListDirTool, ReadFileTool, ReadManyFilesTool
# Write tools
from app.tools.filesystem import DeleteFileTool, MakeDirTool, WriteFileTool
@@ -68,7 +122,8 @@ def create_default_registry(
registry = ToolRegistry()
# Read
registry.register(ReadFileTool(workspace_root, config))
registry.register(ReadFileTool(workspace_root, config, file_cache=file_cache))
registry.register(ReadManyFilesTool(workspace_root, config, file_cache=file_cache))
registry.register(ListDirTool(workspace_root, config))
# Search
@@ -76,13 +131,13 @@ def create_default_registry(
registry.register(FindFilesTool(workspace_root, config))
# Write
registry.register(WriteFileTool(workspace_root, config))
registry.register(WriteFileTool(workspace_root, config, file_cache=file_cache))
registry.register(MakeDirTool(workspace_root, config))
registry.register(DeleteFileTool(workspace_root, config))
registry.register(DeleteFileTool(workspace_root, config, file_cache=file_cache))
# Edit
registry.register(StrReplaceTool(workspace_root, config))
registry.register(PatchApplyTool(workspace_root, config))
registry.register(StrReplaceTool(workspace_root, config, file_cache=file_cache))
registry.register(PatchApplyTool(workspace_root, config, file_cache=file_cache))
# Shell
registry.register(RunCommandTool(workspace_root, config))
@@ -92,8 +147,11 @@ def create_default_registry(
# Skills (conditional)
if skills_manager is not None:
from app.tools.skills import LoadSkillTool
from app.services.skill_runner import SkillRunner as SkillRunnerType
from app.tools.skills import FinishSkillTool, LoadSkillTool
registry.register(LoadSkillTool(workspace_root, config, skills_manager))
runner = skill_runner if isinstance(skill_runner, SkillRunnerType) else None
registry.register(LoadSkillTool(workspace_root, config, skills_manager, runner))
registry.register(FinishSkillTool(workspace_root, config, runner))
return registry

View File

@@ -1,5 +1,6 @@
"""Shell tool: run_command."""
import re
import shlex
import subprocess
from typing import Any
@@ -11,6 +12,9 @@ from app.tools.base import BaseTool
_DEFAULT_TIMEOUT = 30
# Detect shell redirects that write to files (>, >>, heredocs)
_WRITE_REDIRECT_PATTERN = re.compile(r"(?:>\s*\S|>>|<<)")
class RunCommandParams(BaseModel):
"""Parameters for the run_command tool."""
@@ -43,6 +47,18 @@ class RunCommandTool(BaseTool):
error=f"Command denied: matches blocked prefix '{denied}'",
)
# Defense-in-depth: flag file-write redirects in tool result
if _WRITE_REDIRECT_PATTERN.search(command):
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=(
f"Command contains file-write redirect (>, >>, or <<) "
f"which bypasses file-write permissions. Use write_file instead."
),
)
# Allow check: first token must be in allowed_commands
try:
tokens = shlex.split(command)

View File

@@ -1,17 +1,20 @@
"""Load skill tool — allows the LLM to load skill instructions on demand."""
"""Skill toolsload and finish skills during agent operation."""
from __future__ import annotations
from pathlib import Path
from typing import Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar
from pydantic import BaseModel, Field
from app.models.config import AppConfig
from app.models.tool_call import ToolResult, ToolResultStatus
from app.services.skills import SkillsManager
from app.tools.base import BaseTool
if TYPE_CHECKING:
from app.services.skill_runner import SkillRunner
from app.services.skills import SkillsManager
class LoadSkillParams(BaseModel):
"""Parameters for the load_skill tool."""
@@ -23,6 +26,8 @@ class LoadSkillTool(BaseTool):
"""Load a skill's full instructions by name.
Use when a skill is relevant to the current task.
For package skills, this activates the full skill lifecycle
(config overrides, chaining, prompt injection).
"""
name: ClassVar[str] = "load_skill"
@@ -37,14 +42,22 @@ class LoadSkillTool(BaseTool):
workspace_root: Path,
config: AppConfig,
skills_manager: SkillsManager,
skill_runner: SkillRunner | None = None,
) -> None:
super().__init__(workspace_root, config)
self._skills = skills_manager
self._runner = skill_runner
def set_skill_runner(self, runner: SkillRunner) -> None:
"""Late-bind the SkillRunner (avoids circular init dependencies)."""
self._runner = runner
def execute(self, *, tool_call_id: str, **kwargs: Any) -> ToolResult:
skill_name: str = kwargs["name"]
content = self._skills.load_skill(skill_name)
if content is None:
# Check if skill exists
skill = self._skills.get_skill(skill_name)
if skill is None:
available = [s.name for s in self._skills.list_skills()]
return ToolResult(
tool_call_id=tool_call_id,
@@ -52,9 +65,94 @@ class LoadSkillTool(BaseTool):
status=ToolResultStatus.ERROR,
error=f"Unknown skill '{skill_name}'. Available: {available}",
)
# For package skills with a runner, use full activation flow
if skill.manifest is not None and self._runner is not None:
content = self._runner.activate(skill_name)
if content is None:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Failed to activate skill '{skill_name}'",
)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=f"Skill '{skill_name}' activated.\n\n{content}",
)
# Legacy skill: just load content
content = self._skills.load_skill(skill_name)
if content is None:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Failed to load skill '{skill_name}'",
)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=content,
)
class FinishSkillParams(BaseModel):
"""Parameters for the finish_skill tool."""
summary: str = Field(
default="Skill complete.",
description="Brief summary of what was accomplished during the skill",
)
class FinishSkillTool(BaseTool):
"""Signal that the active skill is complete and should be deactivated.
Restores config overrides and tool availability to pre-skill state.
The agent loop continues after this (unlike the finish tool).
"""
name: ClassVar[str] = "finish_skill"
description: ClassVar[str] = (
"Call this when the active skill's task is complete. "
"Deactivates the skill and restores normal config. "
"The conversation continues after this."
)
params_model: ClassVar[type[BaseModel]] = FinishSkillParams
def __init__(
self,
workspace_root: Path,
config: AppConfig,
skill_runner: SkillRunner | None = None,
) -> None:
super().__init__(workspace_root, config)
self._runner = skill_runner
def set_skill_runner(self, runner: SkillRunner) -> None:
"""Late-bind the SkillRunner (avoids circular init dependencies)."""
self._runner = runner
def execute(self, *, tool_call_id: str, **kwargs: Any) -> ToolResult:
summary: str = kwargs.get("summary", "Skill complete.")
if self._runner is None or not self._runner.is_active:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error="No skill is currently active.",
)
skill_name = self._runner.active_skill_name
self._runner.deactivate(summary=summary)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=f"Skill '{skill_name}' completed: {summary}",
)

View File

@@ -10,17 +10,19 @@ from rich.panel import Panel
from rich.text import Text
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.widgets import Header, RichLog
from textual.widgets import Input, RichLog
from textual import work
from app.agent.context import SessionContext
from app.agent.loop import AgentLoop
from app.models.config import AppConfig
from app.models.config import AgentMode, AppConfig
from app.services.llm import LLMClient
from app.services.permissions import PermissionsService
from app.services.session import SessionManager
from app.services.streaming import StreamHandler
from app.tools.registry import create_default_registry
from app.ui.widgets import (
HeaderPanel,
HistoryInput,
PermissionModal,
SessionResumeModal,
@@ -44,6 +46,7 @@ class SneakyCodeApp(App):
BINDINGS = [
Binding("ctrl+c", "cancel_or_quit", "Cancel/Quit", show=False),
Binding("ctrl+p", "cycle_mode", "Cycle Mode"),
]
def __init__(self, config: AppConfig, session_mgr: SessionManager | None = None) -> None:
@@ -57,12 +60,12 @@ class SneakyCodeApp(App):
self._permissions: PermissionsService | None = None
self._debug_logger = None
self._skills_manager = None
self._skill_runner = None
self._current_worker: Worker | None = None
self._cancel_count = 0
self.sub_title = config.llm.model
def compose(self) -> ComposeResult:
yield Header()
yield HeaderPanel(model_name=self._config.llm.model)
yield RichLog(id="chat-log", highlight=True, markup=True)
yield StreamingStatic("", id="streaming")
yield StatusBar()
@@ -72,6 +75,9 @@ class SneakyCodeApp(App):
"""Initialize agent components after the app is mounted."""
setup_logging_for_tui()
# Apply model profile for the initial model before creating context
self._config.apply_model_profile(self._config.llm.model)
self._ctx = SessionContext(self._config)
# Create long-lived agent dependencies (reused across turns)
@@ -94,19 +100,52 @@ class SneakyCodeApp(App):
self._config.skills, self._config.agent.workspace_root
)
# Create file cache if enabled
self._file_cache = None
fs_cache_cfg = self._config.tools.filesystem.cache
if fs_cache_cfg.enabled:
from app.utils.file_cache import FileCache
self._file_cache = FileCache(max_entries=fs_cache_cfg.max_entries)
# Create tool registry (SkillRunner wired after registry exists)
self._tool_registry = create_default_registry(
self._config.agent.workspace_root,
self._config,
skills_manager=self._skills_manager,
file_cache=self._file_cache,
)
# Create SkillRunner and late-bind it to skill tools
if self._skills_manager is not None and self._tool_registry is not None:
from app.services.skill_runner import SkillRunner
self._skill_runner = SkillRunner(
self._skills_manager,
self._config,
self._ctx,
self._tool_registry,
)
# Late-bind runner to skill tools already in the registry
load_tool = self._tool_registry.get("load_skill")
if load_tool and hasattr(load_tool, "set_skill_runner"):
load_tool.set_skill_runner(self._skill_runner)
finish_tool = self._tool_registry.get("finish_skill")
if finish_tool and hasattr(finish_tool, "set_skill_runner"):
finish_tool.set_skill_runner(self._skill_runner)
# Set up permission prompt callback
async def permission_prompt(tool_name: str, description: str) -> bool:
return await self._show_permission_modal(tool_name, description)
self._permissions.set_prompt_callback(permission_prompt)
# Offer session resume if configured
# Offer session resume if configured (must run in a worker for push_screen_wait)
self._offer_session_resume()
@work
async def _offer_session_resume(self) -> None:
"""Offer to resume a previous session, running in a worker for modal support."""
if self._session_mgr and self._config.session.offer_resume:
saved = self._session_mgr.load_latest()
if saved:
@@ -118,7 +157,6 @@ class SneakyCodeApp(App):
log.write(Text("Session restored", style="bold green"))
else:
log.write(Text("Starting fresh session", style="cyan"))
self.query_one(HistoryInput).focus()
async def on_input_submitted(self, event: Input.Submitted) -> None:
@@ -131,6 +169,10 @@ class SneakyCodeApp(App):
event.input.record(user_input)
log = self.query_one("#chat-log", RichLog)
# Echo user prompt (condensed for multi-line)
from app.utils.display import render_user_message
log.write(render_user_message(user_input))
# Handle slash commands
if user_input.startswith("/"):
await self._handle_slash_command(user_input, log)
@@ -147,7 +189,26 @@ class SneakyCodeApp(App):
async def _handle_slash_command(self, command: str, log: RichLog) -> None:
"""Process slash commands."""
cmd = command.lower()
if cmd == "/quit":
if cmd == "/help":
from rich.table import Table
table = Table(title="SneakyCode Commands", show_lines=False)
table.add_column("Command", style="cyan", no_wrap=True)
table.add_column("Description")
table.add_row("/help", "Show this help message")
table.add_row("/quit, /exit, /bye", "Save session and exit")
table.add_row("/clear", "Clear conversation history")
table.add_row("/history", "Show conversation history")
table.add_row("/save", "Manually save session")
table.add_row("/session", "Show session info (messages, tokens, start time)")
table.add_row("/models, /model", "List available Ollama models")
table.add_row("/model <name>", "Switch to a different model")
table.add_row("/mode", "Show current agent mode")
table.add_row("/mode normal|plan|auto", "Switch agent mode")
table.add_row("/skills", "List available skills")
table.add_row("/<skill>", "Load a skill by name")
log.write(table)
elif cmd in ("/quit", "/exit", "/bye"):
self._save_session()
self.exit()
elif cmd == "/clear":
@@ -173,7 +234,7 @@ class SneakyCodeApp(App):
f"Started: {self._ctx.start_time.isoformat()}",
style="cyan",
))
elif cmd.startswith("/models"):
elif cmd.split()[0] in ("/models", "/model"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
# List available models
@@ -197,8 +258,44 @@ class SneakyCodeApp(App):
else:
new_model = parts[1].strip()
self._config.llm.model = new_model
self.sub_title = new_model
log.write(Text(f"Switched to model: {new_model}", style="bold green"))
if self._session_mgr:
self._session_mgr.update_model(new_model)
# Apply model-specific profile overrides
profile = self._config.apply_model_profile(new_model)
if profile and self._ctx:
# Update token budget if the profile overrides it
self._ctx.token_counter.budget = self._config.agent.max_conversation_tokens
self.query_one(HeaderPanel).update_model(new_model)
header = self.query_one(HeaderPanel)
header.update_tokens(
self._ctx.estimated_tokens if self._ctx else 0,
self._config.agent.max_conversation_tokens,
)
msg = f"Switched to model: {new_model}"
if profile:
overrides = []
if profile.max_conversation_tokens is not None:
overrides.append(f"tokens={profile.max_conversation_tokens:,}")
if profile.thinking is not None:
overrides.append(f"thinking={'on' if profile.thinking else 'off'}")
if overrides:
msg += f" ({', '.join(overrides)})"
log.write(Text(msg, style="bold green"))
elif cmd.split()[0] == "/mode":
parts = command.split(maxsplit=1)
if len(parts) == 1:
current = self._permissions.mode
log.write(Text(f"Current mode: {current.value}", style="cyan"))
else:
mode_str = parts[1].strip().lower()
try:
new_mode = AgentMode(mode_str)
except ValueError:
log.write(Text(f"Unknown mode: {mode_str}. Use normal, plan, or auto.", style="yellow"))
return
self._permissions.mode = new_mode
self.query_one(HeaderPanel).update_mode(new_mode)
log.write(Text(f"Switched to {new_mode.value} mode", style="bold green"))
elif cmd == "/skills":
if self._skills_manager:
skills = self._skills_manager.list_skills()
@@ -216,7 +313,24 @@ class SneakyCodeApp(App):
else:
log.write(Text("Skills system is disabled", style="yellow"))
else:
# Try as skill invocation
# Try as skill trigger (package skill via SkillRunner)
if self._skill_runner and self._skills_manager:
skill = self._skills_manager.get_skill_by_trigger(cmd.lstrip("/"))
if skill is not None:
content = self._skill_runner.activate(skill.name)
status_bar = self.query_one(StatusBar)
status_bar.set_active_skill(skill.name)
log.write(Text(f"Skill activated: {skill.name}", style="bold green"))
# Run an agent turn so the LLM sees the skill context
self._cancel_count = 0
self._current_worker = self.run_worker(
self._run_agent_turn(f"[Skill activated: {skill.name}]"),
name="agent-turn",
exclusive=True,
)
return
# Try as legacy skill invocation
skill_name = cmd.lstrip("/")
if self._skills_manager:
content = self._skills_manager.load_skill(skill_name)
@@ -225,7 +339,7 @@ class SneakyCodeApp(App):
self._ctx.add_message("system", f"[Skill: {skill_name}]\n{content}")
log.write(Text(f"Loaded skill: {skill_name}", style="bold green"))
return
log.write(Text(f"Unknown command: {command}", style="yellow"))
log.write(Text(f"Unknown command: {command}", style="yellow"))
async def _run_agent_turn(self, user_input: str) -> None:
"""Run a single agent turn (called as a worker)."""
@@ -243,12 +357,19 @@ class SneakyCodeApp(App):
status_bar.start_streaming()
# Set up streaming UI callbacks
header = self.query_one(HeaderPanel)
def on_content(content: str) -> None:
streaming_widget.update(
Panel(Markdown(content), title="Assistant", border_style="green", expand=True)
)
streaming_widget.show_streaming()
status_bar.update_stream_tokens(len(content) // 4)
stream_tokens = len(content) // 4
status_bar.update_stream_tokens(stream_tokens)
header.update_tokens(
self._ctx.estimated_tokens + stream_tokens,
self._ctx.token_counter.budget,
)
def on_thinking() -> None:
streaming_widget.update(Text("Thinking...", style="dim"))
@@ -265,12 +386,23 @@ class SneakyCodeApp(App):
self._tool_registry, self._permissions, display,
debug_logger=self._debug_logger,
skills_manager=self._skills_manager,
skill_runner=self._skill_runner,
)
await agent.run_turn(user_input)
status_bar.stop_streaming()
# Update token display in header
header = self.query_one(HeaderPanel)
header.update_tokens(self._ctx.estimated_tokens, self._ctx.token_counter.budget)
# Update skill indicator (skill may have been deactivated via finish_skill)
if self._skill_runner and not self._skill_runner.is_active:
status_bar.set_active_skill(None)
elif self._skill_runner and self._skill_runner.is_active:
status_bar.set_active_skill(self._skill_runner.active_skill_name)
# Auto-save
if self._config.session.auto_save:
self._save_session()
@@ -290,6 +422,21 @@ class SneakyCodeApp(App):
log = self.query_one("#chat-log", RichLog)
log.write(Text("⚠ Cancelling... (press Ctrl+C again to quit)", style="yellow"))
def action_cycle_mode(self) -> None:
"""Cycle through agent modes: Normal → Plan → Auto → Normal."""
if self._permissions is None:
return
cycle = {
AgentMode.NORMAL: AgentMode.PLAN,
AgentMode.PLAN: AgentMode.AUTO,
AgentMode.AUTO: AgentMode.NORMAL,
}
new_mode = cycle[self._permissions.mode]
self._permissions.mode = new_mode
self.query_one(HeaderPanel).update_mode(new_mode)
log = self.query_one("#chat-log", RichLog)
log.write(Text(f"Mode: {new_mode.value}", style="bold green"))
async def on_unmount(self) -> None:
"""Clean up the LLM client on app shutdown."""
if self._client is not None:

View File

@@ -55,8 +55,8 @@ Screen {
Input {
dock: bottom;
margin: 0;
border: heavy darkcyan;
padding: 0;
}
Header {
dock: top;
}
/* HeaderPanel styles are in DEFAULT_CSS on the widget itself */

View File

@@ -11,6 +11,82 @@ from textual.widgets import Button, Input, Static
from rich.text import Text
from app.models.config import AgentMode
# ---------------------------------------------------------------------------
# Header Panel
# ---------------------------------------------------------------------------
class HeaderPanel(Static):
"""Single-line header showing model name, agent mode, and token usage."""
DEFAULT_CSS = """
HeaderPanel {
dock: top;
height: 1;
background: darkcyan;
color: $text;
padding: 0 2;
}
"""
def __init__(self, model_name: str) -> None:
super().__init__("")
self._model_name = model_name
self._mode: AgentMode = AgentMode.NORMAL
self._tokens: int = 0
self._budget: int = 0
def on_resize(self) -> None:
self._refresh_display()
def update_model(self, name: str) -> None:
"""Update the displayed model name."""
self._model_name = name
self._refresh_display()
def update_mode(self, mode: AgentMode) -> None:
"""Update the displayed agent mode."""
self._mode = mode
self._refresh_display()
def update_tokens(self, tokens: int, budget: int) -> None:
"""Update the token usage display."""
self._tokens = tokens
self._budget = budget
self._refresh_display()
def _refresh_display(self) -> None:
"""Rebuild the header text."""
left = Text.assemble(
("⚡ SneakyCode", "bold"),
"",
(self._model_name, "bold"),
)
mode_styles = {
AgentMode.NORMAL: ("NORMAL", "bold black on white"),
AgentMode.PLAN: ("PLAN", "bold black on yellow"),
AgentMode.AUTO: ("AUTO", "bold white on red"),
}
mode_label, mode_style = mode_styles[self._mode]
mode_text = Text.assemble((" ", mode_style), (mode_label, mode_style), (" ", mode_style))
right = Text(f"~{self._tokens:,} / {self._budget:,} tokens")
# Pad between sections
total_content = left.plain + " " + mode_text.plain + " " + right.plain
available = self.size.width if self.size.width > 0 else 80
gap_left = max(1, (available - len(total_content)) // 2)
gap_right = max(1, available - len(total_content) - gap_left)
full = Text.assemble(
left, " " * gap_left, mode_text, " " * gap_right, right,
)
self.update(full)
# ---------------------------------------------------------------------------
# Modal Dialogs
@@ -139,20 +215,13 @@ class StatusBar(Static):
def __init__(self) -> None:
super().__init__("")
self._tokens: int = 0
self._budget: int = 0
self._iteration: int = 0
self._max_iterations: int = 0
self._streaming: bool = False
self._spinner_frame: int = 0
self._spinner_timer: Timer | None = None
self._stream_tokens: int = 0
def update_tokens(self, tokens: int, budget: int) -> None:
"""Update the token usage display."""
self._tokens = tokens
self._budget = budget
self._refresh_display()
self._active_skill: str | None = None
def update_iteration(self, iteration: int, max_iterations: int) -> None:
"""Update the iteration count display."""
@@ -184,16 +253,21 @@ class StatusBar(Static):
self._spinner_frame = (self._spinner_frame + 1) % len(self._SPINNER)
self._refresh_display()
def set_active_skill(self, skill_name: str | None) -> None:
"""Set or clear the active skill indicator."""
self._active_skill = skill_name
self._refresh_display()
def _refresh_display(self) -> None:
"""Rebuild the status bar text."""
parts: list[str] = []
if self._active_skill:
parts.append(f"[Skill: {self._active_skill}]")
if self._streaming:
spinner = self._SPINNER[self._spinner_frame]
parts.append(f"{spinner} Thinking")
if self._stream_tokens > 0:
parts.append(f"~{self._stream_tokens:,} tokens")
if self._budget > 0:
parts.append(f"Tokens: ~{self._tokens:,} / {self._budget:,}")
if self._max_iterations > 0:
parts.append(f"Iteration {self._iteration}/{self._max_iterations}")
self.update(Text(" \u2502 ".join(parts), style="dim"))

View File

@@ -10,6 +10,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Protocol
from rich.markdown import Markdown
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
@@ -43,14 +44,27 @@ if TYPE_CHECKING:
# ---------------------------------------------------------------------------
def render_user_message(content: str) -> Panel:
"""Render a user message as a styled panel."""
return Panel(content, title="You", border_style="cyan", expand=False)
def render_user_message(content: str) -> Text:
"""Render a condensed user prompt as a single styled line.
Multi-line input is collapsed to the first line with a line count suffix.
Long single lines are truncated.
"""
lines = content.splitlines()
first = lines[0] if lines else content
max_len = 120
if len(first) > max_len:
first = first[:max_len] + ""
suffix = f" (+{len(lines) - 1} lines)" if len(lines) > 1 else ""
text = Text()
text.append("You: ", style="bold cyan")
text.append(first + suffix, style="cyan")
return text
def render_assistant_message(content: str) -> Panel:
"""Render an assistant message as a styled panel."""
return Panel(content, title="Assistant", border_style="green", expand=True)
return Panel(Markdown(content), title="Assistant", border_style="green", expand=True)
def render_tool_call(name: str, args: str) -> Text:
@@ -222,8 +236,8 @@ def print_success(message: str) -> None:
def print_user_message(content: str) -> None:
"""Print a user message in a styled panel."""
console.print(Panel(content, title="You", border_style="cyan", expand=False))
"""Print a condensed user prompt line."""
console.print(render_user_message(content))
def print_assistant_message(content: str) -> None:

185
app/utils/file_cache.py Normal file
View File

@@ -0,0 +1,185 @@
"""File cache with LRU eviction and mtime-based invalidation."""
from __future__ import annotations
from collections import OrderedDict
from dataclasses import dataclass, field
from pathlib import Path
from app.utils.file_helpers import (
BinaryFileError,
FileSizeError,
PathSecurityError,
check_file_size,
is_binary_file,
resolve_safe_path,
)
@dataclass(slots=True)
class CacheEntry:
"""A cached file's content and modification timestamp."""
content: str
mtime_ns: int
@dataclass
class CacheStats:
"""Running statistics for a FileCache instance."""
hits: int = 0
misses: int = 0
invalidations: int = 0
evictions: int = 0
@property
def hit_rate(self) -> float:
"""Return cache hit rate as a float between 0.0 and 1.0."""
total = self.hits + self.misses
if total == 0:
return 0.0
return self.hits / total
class FileCache:
"""LRU file-content cache with mtime-based invalidation.
Keyed by resolved absolute ``Path``. Each lookup performs a cheap
``stat()`` syscall to verify the file hasn't changed on disk — if the
nanosecond mtime differs the entry is evicted and the caller gets a
cache miss.
Not thread-safe (single-threaded agent loop).
"""
def __init__(self, max_entries: int = 128) -> None:
self._max_entries = max_entries
self._entries: OrderedDict[Path, CacheEntry] = OrderedDict()
self._stats = CacheStats()
# -- public API --------------------------------------------------
def get(self, path: Path) -> str | None:
"""Return cached content if *path* hasn't changed, else ``None``.
A ``stat()`` call checks ``st_mtime_ns``; on mismatch the stale
entry is silently removed.
"""
entry = self._entries.get(path)
if entry is None:
self._stats.misses += 1
return None
try:
current_mtime_ns = path.stat().st_mtime_ns
except OSError:
# File gone — evict and miss.
self._remove(path)
self._stats.misses += 1
return None
if current_mtime_ns != entry.mtime_ns:
self._remove(path)
self._stats.invalidations += 1
self._stats.misses += 1
return None
# Cache hit — move to end (most-recently used).
self._entries.move_to_end(path)
self._stats.hits += 1
return entry.content
def put(self, path: Path, content: str) -> None:
"""Store *content* for *path* with its current ``st_mtime_ns``.
Evicts the least-recently-used entry when over capacity.
"""
try:
mtime_ns = path.stat().st_mtime_ns
except OSError:
# Can't stat — don't cache.
return
if path in self._entries:
# Update existing; move to end.
self._entries[path] = CacheEntry(content=content, mtime_ns=mtime_ns)
self._entries.move_to_end(path)
else:
self._entries[path] = CacheEntry(content=content, mtime_ns=mtime_ns)
# Evict LRU if over capacity.
while len(self._entries) > self._max_entries:
self._entries.popitem(last=False)
self._stats.evictions += 1
def invalidate(self, path: Path) -> None:
"""Remove *path* from the cache if present."""
if path in self._entries:
del self._entries[path]
self._stats.invalidations += 1
def clear(self) -> None:
"""Remove all entries."""
self._entries.clear()
@property
def stats(self) -> CacheStats:
"""Return the running cache statistics."""
return self._stats
def __len__(self) -> int:
return len(self._entries)
# -- internals ---------------------------------------------------
def _remove(self, path: Path) -> None:
"""Delete an entry without bumping invalidation stats."""
self._entries.pop(path, None)
def cached_read_file(
path: str | Path,
workspace_root: Path,
max_size_bytes: int = 1_048_576,
check_binary: bool = True,
cache: FileCache | None = None,
) -> str:
"""Read a file with full security checks, using *cache* when available.
Security checks (path sandboxing, size limit, binary detection) run on
**every** call — only the ``Path.read_text()`` I/O is skipped on a cache
hit.
When *cache* is ``None`` this behaves identically to
:func:`~app.utils.file_helpers.safe_read_file`.
Raises:
PathSecurityError: If the path escapes the workspace.
FileSizeError: If the file is too large.
BinaryFileError: If the file is binary and *check_binary* is True.
FileNotFoundError: If the file does not exist.
"""
safe_path = resolve_safe_path(path, workspace_root)
if not safe_path.exists():
raise FileNotFoundError(f"File not found: {safe_path}")
check_file_size(safe_path, max_size_bytes)
if check_binary and is_binary_file(safe_path):
raise BinaryFileError(f"File appears to be binary: {safe_path}")
# Try cache.
if cache is not None:
cached = cache.get(safe_path)
if cached is not None:
return cached
# Cache miss (or no cache) — read from disk.
content = safe_path.read_text(encoding="utf-8")
if cache is not None:
cache.put(safe_path, content)
return content

View File

@@ -36,6 +36,11 @@ class TokenCounter:
"""The configured token budget."""
return self._budget
@budget.setter
def budget(self, value: int) -> None:
"""Update the token budget (e.g., when switching models)."""
self._budget = value
@property
def cumulative_usage(self) -> TokenUsage:
"""Cumulative token usage across all tracked calls."""

View File

@@ -10,14 +10,32 @@ llm:
max_retries: 3
retry_backoff_base: 1.0
retry_backoff_max: 30.0
thinking: false # Disable model thinking/reasoning mode (reduces reasoning-only loops)
# Extra parameters merged into the API request body (model-specific).
# Examples:
# OpenAI: reasoning_effort: "low"
extra_body: {}
agent:
max_iterations: 25
max_conversation_tokens: 32000
max_conversation_tokens: 32000 # Default token budget (overridden by model_profiles)
workspace_root: "."
truncation_keep_recent: 10
truncation_threshold: 0.85
# Per-model overrides — matched by longest model name prefix.
# Unset fields fall through to the defaults above.
model_profiles:
llama3:
max_conversation_tokens: 120000
thinking: false
qwen:
max_conversation_tokens: 32000
thinking: false
qwq:
max_conversation_tokens: 32000
thinking: true
permissions:
auto_approve:
- read_file
@@ -43,7 +61,6 @@ tools:
- pytest
- ruff
- ls
- cat
- head
- tail
- wc
@@ -51,6 +68,10 @@ tools:
- grep
- find
- echo
- which
- jq
- type
- file
denied_commands:
- rm -rf /
- sudo
@@ -60,6 +81,9 @@ tools:
filesystem:
max_file_size_bytes: 1048576 # 1 MB
binary_detection: true
cache:
enabled: true
max_entries: 128
session:
session_dir: ".sneakycode/sessions"

View File

@@ -1,144 +0,0 @@
# SneakyCode Implementation Roadmap
A phased plan progressing from bare-bones foundation to full autonomous coding agent.
---
## Phase 1 — Foundation: Models, Config, and Utilities
Establish the data layer and shared infrastructure everything else builds on.
| File | Description |
|------|-------------|
| `app/models/config.py` | Pydantic v2 config model — load and validate `config/config.yaml` |
| `app/models/message.py` | Message schema (role, content, tool_calls) |
| `app/models/tool_call.py` | ToolCall and ToolResult schemas |
| `app/utils/logging.py` | Centralized logger with Rich handler |
| `app/utils/display.py` | Rich console output helpers (stub — expanded in Phase 2) |
| `app/utils/file_helpers.py` | Safe path resolution, binary detection, size guards |
| `app/utils/token_counter.py` | Approximate token usage tracking (character-based heuristic for v1) |
| `app/main.py` | Entrypoint stub — arg parsing, config load, Rich console setup |
**Exit criteria:** `python -m app.main --help` runs, config loads and validates, models can be instantiated and serialized.
---
## Phase 2 — TUI and Interactive Shell
Get a working interactive terminal before wiring up the LLM.
| File | Description |
|------|-------------|
| `app/main.py` | Rich-based interactive REPL loop — prompt for user input, display responses |
| `app/utils/display.py` | Formatted output for agent messages, tool calls, errors, token usage |
| `app/agent/context.py` | Session state and conversation history management |
**Exit criteria:** User can type messages into a styled REPL, see them echoed back with formatting, and conversation history is tracked in memory.
---
## Phase 3 — LLM Integration (Ollama)
Connect to the local LLM and stream responses into the TUI.
| File | Description |
|------|-------------|
| `app/services/llm.py` | Async httpx client wrapping Ollama's OpenAI-compatible `/v1/chat/completions` endpoint |
| `app/services/streaming.py` | SSE parsing, Rich live display, tool call extraction from accumulated stream |
**Integration:** Wire LLM into the REPL — user message goes to LLM, streamed response displays in real time.
**Exit criteria:** User can chat with the local model through the TUI with streamed output. Tool call JSON is parsed from the stream but not yet executed.
---
## Phase 4 — Tool Framework and Core Tools
Build the tool abstraction and implement safe, read-only tools first.
| File | Description |
|------|-------------|
| `app/tools/base.py` | `BaseTool` ABC and `ToolResult` dataclass |
| `app/tools/registry.py` | Tool registration, discovery, and JSON schema export for LLM system prompt |
| `app/services/permissions.py` | Two-tier approval gating (auto-approve reads; prompt for writes/deletes/shell) |
| `app/tools/filesystem.py` | `read_file`, `list_dir` |
| `app/tools/search.py` | `grep_files`, `find_files` |
**Exit criteria:** Tools register themselves, schemas export correctly for inclusion in the system prompt, read-only tools execute and return `ToolResult` objects. Permissions service gates execution.
---
## Phase 5 — Agent Loop (ReAct)
The core autonomy layer — reason, act, observe, repeat.
| File | Description |
|------|-------------|
| `app/agent/loop.py` | ReAct cycle: send conversation to LLM, parse tool calls, execute, feed results back, repeat |
**Key behaviors:**
- System prompt constructed with tool schemas from registry
- Permissions checks before each tool execution
- Loop termination on: plain-text response (no tool calls), explicit `finish` tool call, or `max_iterations` exceeded
**Exit criteria:** Agent can autonomously answer questions about the codebase by chaining `read_file`, `list_dir`, `grep_files`, and `find_files` tool calls in a multi-turn loop.
---
## Phase 6 — Write Tools and Shell
Unlock the agent's ability to modify code and run commands.
| File | Description |
|------|-------------|
| `app/tools/filesystem.py` | `write_file`, `make_dir`, `delete_file` (additions to existing module) |
| `app/tools/edit.py` | `str_replace` (unique-match required), `patch_apply` |
| `app/tools/shell.py` | `run_command` with command allow/deny lists and output truncation |
**All write/shell operations gated through permissions service.**
**Exit criteria:** Agent can autonomously create files, edit code via string replacement, and run shell commands — all with user approval for destructive operations.
---
## Phase 7 — Polish and Hardening
Production-readiness: error handling, resource limits, and documentation.
| Area | Description |
|------|-------------|
| Error handling | Recovery from malformed tool calls, LLM errors, network timeouts in agent loop |
| Token budget | Conversation truncation or summarization when approaching context limit |
| Graceful shutdown | Clean Ctrl+C handling, session state preservation |
| Testing | End-to-end integration tests (`tests/integration/`), unit tests (`tests/unit/`) |
| Documentation | `README.md` with setup and usage instructions, `docs/tools.md` tool reference |
**Exit criteria:** Agent handles edge cases gracefully, tests pass, and a new user can set up and use the project from the README alone.
---
## File Coverage
Every file from the project structure in CLAUDE.md is accounted for:
| File | Phase |
|------|-------|
| `app/main.py` | 1, 2 |
| `app/models/config.py` | 1 |
| `app/models/message.py` | 1 |
| `app/models/tool_call.py` | 1 |
| `app/utils/logging.py` | 1 |
| `app/utils/display.py` | 1, 2 |
| `app/utils/file_helpers.py` | 1 |
| `app/utils/token_counter.py` | 1 |
| `app/agent/context.py` | 2 |
| `app/services/llm.py` | 3 |
| `app/services/streaming.py` | 3 |
| `app/tools/base.py` | 4 |
| `app/tools/registry.py` | 4 |
| `app/services/permissions.py` | 4 |
| `app/tools/filesystem.py` | 4, 6 |
| `app/tools/search.py` | 4 |
| `app/agent/loop.py` | 5 |
| `app/tools/edit.py` | 6 |
| `app/tools/shell.py` | 6 |

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,192 @@
# Textual TUI Redesign — Design Spec
## Overview
Replace the current sequential print-and-scroll terminal UI with a full persistent split-screen TUI using Textual. Input is pinned at the bottom, scrollable message history above, with a header showing app/model info and a footer showing token usage and iteration count.
## Layout
```
+------------------- Header --------------------+
| SneakyCode qwen2.5-coder:32b |
+-----------------------------------------------+
| |
| +--- You ---+ |
| | prompt | <- RichLog widget |
| +-----------+ (handles own scrolling) |
| |
| Thinking... |
| |
| +-- Assistant --+ |
| | response... | |
| +---------------+ |
| |
| > read_file README.md -- 148 lines, 5128 ch |
| > grep_files "pattern" -- 3 matches |
| |
+-----------------------------------------------+
| Tokens: ~1,511 / 32,000 | Iteration 5/25 | <- StatusBar
+-----------------------------------------------+
| > [input cursor] | <- Input widget
+-----------------------------------------------+
```
**Widget hierarchy (no VerticalScroll wrapper — RichLog handles its own scrolling):**
- `Header` — Textual built-in, title="SneakyCode", subtitle=model name
- `RichLog` (id="chat-log") — main scroll area, accepts Rich renderables via `.write()`
- `StreamingStatic` — persistent hidden `Static` widget, shown/hidden during streaming (avoids mount/unmount overhead)
- `StatusBar` — custom `Static` widget, 1 row, docked above Input
- `Input` — Textual built-in, pinned at bottom
## New Files
### `app/ui/app.py` — Textual App
SneakyCodeApp subclasses `textual.app.App`. Responsibilities:
- `compose()` yields: Header, RichLog(id="chat-log"), StreamingStatic(id="streaming"), StatusBar(id="status"), Input
- `on_input_submitted()` handler: reads input value, clears input, writes user panel to chat log, dispatches agent turn as a worker
- Agent turn runs via `run_worker()` (async worker, NOT threaded) so the UI stays responsive. Since the worker is async and on the event loop, widget methods can be called directly — no `call_from_thread()` needed.
- Slash commands (/quit, /history, /clear, /save, /session) parsed from input before dispatching to agent
- Holds references to config, SessionContext, AgentLoop (created in `on_mount`)
- Header subtitle set to model name from config
- `on_worker_state_changed()` handler: catches worker errors and writes error panels to RichLog
- Ctrl+C binding: cancels the running agent worker (does NOT quit the app). A second Ctrl+C or `/quit` exits.
### `app/ui/widgets.py` — Custom Widgets
**StatusBar** — A simple `Static` widget styled as a footer bar. Displays token usage and iteration count. Updated by the agent loop after each LLM step via `status_bar.update(renderable)`.
**StreamingStatic** — A `Static` widget that stays mounted but hidden. During streaming, it becomes visible and receives `update()` calls with partial content. When streaming ends, it is hidden and its content is cleared. This avoids the overhead of mounting/unmounting on every LLM response.
### `app/ui/styles.tcss` — Textual CSS
Layout rules:
- RichLog fills available height (fraction-based sizing, e.g. `height: 1fr`)
- StreamingStatic: `display: none` by default, shown during streaming
- StatusBar is 1 row, docked bottom above Input
- Input is 1 row, docked at very bottom
- Color scheme matches existing SNEAKYCODE_THEME (cyan for user, green for assistant, magenta for tools, dim for metadata)
## Modified Files
### `app/main.py`
- Remove `_run_repl()` async function entirely
- Remove `console.input()` usage
- `main()` creates config, runs preflight via `asyncio.run(_preflight(config))` (before Textual starts — this is fine, separate event loop), then instantiates and runs `SneakyCodeApp(config).run()`
- CLI arg parsing stays (--config, -v, --log-file)
- Session resume: `_offer_session_resume()` moves into `SneakyCodeApp.on_mount()` — instead of `console.input()`, push a modal screen asking "Resume previous session? [y/n]" with button/key handlers
- Auto-save: triggers after each agent turn completes (in the worker completion handler)
- SIGTERM handler: removed — Textual manages its own signal handling and shutdown lifecycle
### `app/services/streaming.py`
- Remove `from rich.live import Live` and `from rich.spinner import Spinner`
- `process_stream()` no longer creates a `Rich.Live` context
- Instead, accepts callback parameters:
- `on_content: Callable[[str], None]` — called with accumulated content on each content chunk
- `on_thinking: Callable[[], None]` — called once when first reasoning token arrives
- `on_done: Callable[[], None]` — called when streaming completes
- **Throttling:** Content callback fires at most every 100ms (track last update time, skip intermediate chunks). Final content always fires on stream end.
- Since the agent runs as an async worker (on the event loop), callbacks can directly call widget methods — no `call_from_thread()` needed.
- All accumulation and tool-call parsing logic stays identical
### `app/utils/display.py`
- All `print_*` functions become `render_*` functions that return Rich renderables:
- `render_user_message(content) -> Panel`
- `render_assistant_message(content) -> Panel`
- `render_tool_call(name, args) -> Text`
- `render_tool_result(name, output, is_error) -> Text`
- `render_iteration_header(iteration, max_iter) -> Text`
- `render_warning(message) -> Text`
- `render_error(message) -> Text`
- `print_banner()` removed — Header widget replaces it
- `print_token_usage()` becomes `render_token_usage() -> Text` for the StatusBar
- `print_history()` becomes `render_history() -> Table` — written to RichLog, may need width constraints for narrow terminals
- A `DisplayAdapter` class wraps a `RichLog` reference and provides `write_user_message()`, `write_tool_call()`, etc. methods that call `render_*` then `rich_log.write()`
### `app/agent/loop.py`
- `AgentLoop.__init__()` accepts a `DisplayAdapter` instead of calling `display.py` print functions directly
- All display calls route through the adapter: `self._display.write_tool_call(name, args)`, `self._display.write_iteration_header(i, max)`, etc.
- `_execute_tool_calls()` becomes `async def _execute_tool_calls()` to support async permission checks
- The loop logic (ReAct pattern, retry, truncation) is unchanged
### `app/services/permissions.py`
- `PermissionsService.check()` becomes `async def check()`
- Instead of `rich.prompt.Confirm.ask()` (blocking stdin read), it:
1. Creates an `asyncio.Event`
2. Posts a custom message to the app requesting a permission modal
3. The app pushes a modal screen with the permission question and approve/deny buttons
4. When the user responds, the modal sets the event and stores the result
5. `check()` awaits the event and reads the result
- Edge cases: dismiss without choosing = deny. Ctrl+C during modal = deny. Focus returns to Input after modal dismisses.
### `app/utils/logging.py`
- **Critical change:** The shared `console = Console()` instance will corrupt the Textual display since Textual takes exclusive terminal control
- When running under Textual: disable `RichHandler` (console handler), keep only the file handler
- Add a `setup_logging_for_tui()` function that reconfigures logging to file-only mode
- Called from `SneakyCodeApp.on_mount()` before any agent work begins
- The `console` object still exists but should not be used for output during TUI mode — all output goes through the DisplayAdapter
- Consider: `--log-file` becomes required (or auto-set to a default) when running in TUI mode, so logs are not lost
## Unchanged Files
- `app/services/llm.py` — HTTP client, SSE parsing untouched
- `app/agent/context.py` — session state untouched
- `app/models/*` — all data models untouched
- `app/tools/*` — all tool implementations untouched
- `app/utils/file_helpers.py` — path safety untouched
- `app/utils/token_counter.py` — token counting untouched
## Key Patterns
### Streaming in Textual
The agent loop runs as an async worker (on the event loop, NOT threaded). During streaming:
1. App shows `StreamingStatic` widget, writes "Thinking..." initially
2. Worker calls `StreamHandler.process_stream(chunks, on_content=..., on_thinking=..., on_done=...)`
3. `on_content` callback: updates `StreamingStatic` with `Panel(Markdown(partial_content), title="Assistant", border_style="green")` — throttled to ~100ms intervals
4. `on_done` callback: hides `StreamingStatic`, writes final content to `RichLog` via `DisplayAdapter`
Since the worker is async (not threaded), callbacks run on the event loop and can call widget methods directly.
### Permission Prompts
1. Agent loop (in async worker) calls `await permissions.check(operation, details)`
2. `check()` creates an `asyncio.Event` and posts `PermissionRequest` message to the app
3. App handles `PermissionRequest`: pushes a modal screen with the question, approve/deny buttons
4. Modal screen: on button press, stores result and sets the event
5. `check()` awaits the event, reads result, returns approved/denied
6. Focus management: Input loses focus when modal appears, regains focus when modal dismisses
7. Default on dismiss/Ctrl+C: deny
### Cancellation
- Ctrl+C (first press): cancels the running agent worker via `worker.cancel()`. The agent loop should check for cancellation between iterations.
- Ctrl+C (second press) or `/quit`: exits the app via `app.exit()`
## Dependencies
- Add `textual>=4.0.0` to pyproject.toml dependencies
## Verification
1. Run the app — header shows app name + model, no console corruption
2. Type a prompt — user panel appears in scroll area, input clears
3. During LLM streaming — assistant response types out live (throttled) in the scroll area
4. Thinking indicator shows during reasoning-only phases
5. Tool calls appear as compact lines in the scroll area
6. Footer shows token usage and iteration count, updating each step
7. Scroll area auto-scrolls to bottom on new content
8. /quit, /clear, /history commands work from the input
9. Permission prompts show as modal, approve/deny work, focus returns to input
10. Ctrl+C cancels running agent turn without quitting
11. Worker errors display as error panels in the scroll area
12. Logging goes to file only — no console corruption
13. Session resume works on startup via modal dialog

View File

@@ -1 +1,12 @@
Pressing up should cycle history like claude code.
# UI Issues
on /clear we need to reset the token counter in the header panel.
# Bugs
# Improvements
add -p to command line args so that the agent can run the prompt and return data directly via STDOUT
# Open questions:
How might we pass a directory to this app and have it use that directory as it's workspace so I don't have to copy files or do odd things to work in other directories.
How do we handle huge files not taking up so many tokens?

View File

@@ -21,10 +21,10 @@ from app.utils.display import (
class TestRenderFunctions:
def test_render_user_message_returns_panel(self) -> None:
def test_render_user_message_returns_text(self) -> None:
result = render_user_message("hello")
assert isinstance(result, Panel)
assert result.title == "You"
assert isinstance(result, Text)
assert "hello" in result.plain
def test_render_assistant_message_returns_panel(self) -> None:
result = render_assistant_message("response")
@@ -72,7 +72,7 @@ class TestDisplayAdapter:
adapter.write_user_message("hello")
mock_log.write.assert_called_once()
arg = mock_log.write.call_args[0][0]
assert isinstance(arg, Panel)
assert isinstance(arg, Text)
def test_write_tool_call(self) -> None:
mock_log = MagicMock()

View File

@@ -0,0 +1,314 @@
"""Tests for the file cache with LRU eviction and mtime invalidation."""
import os
import time
from pathlib import Path
import pytest
from app.models.config import AppConfig, load_config
from app.models.tool_call import ToolResultStatus
from app.tools.filesystem import ReadFileTool, ReadManyFilesTool
from app.utils.file_cache import CacheStats, FileCache, cached_read_file
from app.utils.file_helpers import BinaryFileError, FileSizeError, PathSecurityError
# ---------------------------------------------------------------------------
# FileCache unit tests
# ---------------------------------------------------------------------------
class TestFileCache:
def test_put_and_get_roundtrip(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "hello.txt"
f.write_text("hello world")
cache.put(f, "hello world")
assert cache.get(f) == "hello world"
def test_get_returns_none_for_missing_key(self, tmp_path: Path) -> None:
cache = FileCache()
assert cache.get(tmp_path / "nope.txt") is None
def test_mtime_change_causes_miss(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "data.txt"
f.write_text("v1")
cache.put(f, "v1")
# Mutate the file so mtime changes
time.sleep(0.05) # ensure mtime differs
f.write_text("v2")
assert cache.get(f) is None # stale → miss
assert cache.stats.invalidations == 1
def test_lru_eviction_at_capacity(self, tmp_path: Path) -> None:
cache = FileCache(max_entries=3)
files = []
for i in range(4):
f = tmp_path / f"f{i}.txt"
f.write_text(f"content-{i}")
files.append(f)
# Fill cache to capacity
for f in files[:3]:
cache.put(f, f.read_text())
assert len(cache) == 3
# Adding a 4th evicts the LRU (files[0])
cache.put(files[3], files[3].read_text())
assert len(cache) == 3
assert cache.get(files[0]) is None # evicted
assert cache.stats.evictions == 1
# files[1..3] still present
for f in files[1:]:
assert cache.get(f) is not None
def test_invalidate_removes_entry(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "rm.txt"
f.write_text("bye")
cache.put(f, "bye")
assert len(cache) == 1
cache.invalidate(f)
assert len(cache) == 0
assert cache.get(f) is None
assert cache.stats.invalidations == 1
def test_invalidate_noop_for_missing(self) -> None:
cache = FileCache()
cache.invalidate(Path("/nonexistent"))
assert cache.stats.invalidations == 0
def test_clear_empties_cache(self, tmp_path: Path) -> None:
cache = FileCache()
for i in range(5):
f = tmp_path / f"c{i}.txt"
f.write_text(str(i))
cache.put(f, str(i))
assert len(cache) == 5
cache.clear()
assert len(cache) == 0
def test_stats_accuracy(self, tmp_path: Path) -> None:
cache = FileCache(max_entries=2)
a = tmp_path / "a.txt"
b = tmp_path / "b.txt"
c = tmp_path / "c.txt"
a.write_text("a")
b.write_text("b")
c.write_text("c")
# Miss
cache.get(a)
assert cache.stats.misses == 1
assert cache.stats.hits == 0
# Put + hit
cache.put(a, "a")
cache.get(a)
assert cache.stats.hits == 1
# Fill + evict
cache.put(b, "b")
cache.put(c, "c") # evicts a
assert cache.stats.evictions == 1
def test_hit_rate(self) -> None:
stats = CacheStats(hits=3, misses=1)
assert stats.hit_rate == pytest.approx(0.75)
def test_hit_rate_zero_total(self) -> None:
stats = CacheStats()
assert stats.hit_rate == 0.0
def test_file_deleted_after_caching(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "gone.txt"
f.write_text("here")
cache.put(f, "here")
f.unlink()
assert cache.get(f) is None # stat fails → miss
def test_put_skips_when_stat_fails(self) -> None:
cache = FileCache()
cache.put(Path("/totally/nonexistent"), "data")
assert len(cache) == 0
def test_get_moves_to_end(self, tmp_path: Path) -> None:
"""Accessing an entry makes it most-recently-used, protecting from eviction."""
cache = FileCache(max_entries=3)
files = []
for i in range(3):
f = tmp_path / f"lru{i}.txt"
f.write_text(f"c{i}")
files.append(f)
cache.put(f, f"c{i}")
# Touch files[0] to make it MRU
cache.get(files[0])
# Add a new entry — files[1] (LRU) should be evicted, not files[0]
extra = tmp_path / "extra.txt"
extra.write_text("x")
cache.put(extra, "x")
assert cache.get(files[0]) is not None # protected by access
assert cache.get(files[1]) is None # evicted
# ---------------------------------------------------------------------------
# cached_read_file tests
# ---------------------------------------------------------------------------
class TestCachedReadFile:
def test_without_cache_matches_safe_read(self, tmp_path: Path) -> None:
f = tmp_path / "plain.txt"
f.write_text("hello")
content = cached_read_file(f, tmp_path, cache=None)
assert content == "hello"
def test_populates_on_miss_returns_on_hit(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "cached.txt"
f.write_text("data")
# First call: miss → read from disk → populate cache
content1 = cached_read_file(f, tmp_path, cache=cache)
assert content1 == "data"
assert cache.stats.misses == 1
assert cache.stats.hits == 0
# Second call: hit → from cache
content2 = cached_read_file(f, tmp_path, cache=cache)
assert content2 == "data"
assert cache.stats.hits == 1
def test_security_checks_run_on_cached_path(self, tmp_path: Path) -> None:
cache = FileCache()
with pytest.raises(PathSecurityError):
cached_read_file("/etc/passwd", tmp_path, cache=cache)
def test_binary_check_runs_on_cached_path(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "bin.dat"
f.write_bytes(b"\x00binary\x00")
with pytest.raises(BinaryFileError):
cached_read_file(f, tmp_path, cache=cache)
def test_size_check_runs_on_cached_path(self, tmp_path: Path) -> None:
cache = FileCache()
f = tmp_path / "big.txt"
f.write_text("x" * 200)
# First read populates cache
cached_read_file(f, tmp_path, max_size_bytes=1000, cache=cache)
# Now make file too big on disk — security check should catch it
# even though content is cached
f.write_text("x" * 2000)
with pytest.raises(FileSizeError):
cached_read_file(f, tmp_path, max_size_bytes=1000, cache=cache)
def test_file_not_found(self, tmp_path: Path) -> None:
cache = FileCache()
with pytest.raises(FileNotFoundError):
cached_read_file(tmp_path / "nope.txt", tmp_path, cache=cache)
# ---------------------------------------------------------------------------
# Tool-level cache-hit dedup tests
# ---------------------------------------------------------------------------
@pytest.fixture
def config() -> AppConfig:
return load_config()
@pytest.fixture
def tmp_workspace(tmp_path: Path, config: AppConfig) -> tuple[Path, AppConfig]:
config.agent.workspace_root = tmp_path
return tmp_path, config
class TestReadFileToolCacheHit:
def test_first_read_returns_full_content(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
cache = FileCache()
(ws / "hello.txt").write_text("hello world")
tool = ReadFileTool(ws, cfg, file_cache=cache)
result = tool.run("tc-1", {"file_path": "hello.txt"})
assert result.status == ToolResultStatus.SUCCESS
assert result.output == "hello world"
def test_second_read_returns_cached_message(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
cache = FileCache()
(ws / "hello.txt").write_text("hello world")
tool = ReadFileTool(ws, cfg, file_cache=cache)
tool.run("tc-1", {"file_path": "hello.txt"})
result2 = tool.run("tc-2", {"file_path": "hello.txt"})
assert result2.status == ToolResultStatus.SUCCESS
assert "[Cached]" in result2.output
assert "hello.txt" in result2.output
assert "hello world" not in result2.output
def test_changed_file_returns_full_content_again(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
cache = FileCache()
f = ws / "data.txt"
f.write_text("v1")
tool = ReadFileTool(ws, cfg, file_cache=cache)
tool.run("tc-1", {"file_path": "data.txt"})
# Mutate file so mtime changes
time.sleep(0.05)
f.write_text("v2")
result2 = tool.run("tc-2", {"file_path": "data.txt"})
assert result2.status == ToolResultStatus.SUCCESS
assert result2.output == "v2"
assert "[Cached]" not in result2.output
def test_no_cache_always_returns_content(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
(ws / "hello.txt").write_text("hello")
tool = ReadFileTool(ws, cfg, file_cache=None)
r1 = tool.run("tc-1", {"file_path": "hello.txt"})
r2 = tool.run("tc-2", {"file_path": "hello.txt"})
assert r1.output == "hello"
assert r2.output == "hello"
class TestReadManyFilesToolCacheHit:
def test_cached_files_get_short_message(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
cache = FileCache()
(ws / "a.txt").write_text("alpha")
(ws / "b.txt").write_text("bravo")
tool = ReadManyFilesTool(ws, cfg, file_cache=cache)
# First read — full content
r1 = tool.run("tc-1", {"file_paths": ["a.txt", "b.txt"]})
assert "alpha" in r1.output
assert "bravo" in r1.output
# Second read — cached messages
r2 = tool.run("tc-2", {"file_paths": ["a.txt", "b.txt"]})
assert "[Cached]" in r2.output
assert "alpha" not in r2.output
assert "bravo" not in r2.output

View File

@@ -0,0 +1,69 @@
"""Tests for the read_many_files tool."""
from pathlib import Path
import pytest
from app.models.config import AppConfig, load_config
from app.models.tool_call import ToolResultStatus
from app.tools.filesystem import ReadManyFilesTool
@pytest.fixture
def config() -> AppConfig:
return load_config()
@pytest.fixture
def tmp_workspace(tmp_path: Path, config: AppConfig) -> tuple[Path, AppConfig]:
"""Create a temporary workspace for read_many_files tests."""
config.agent.workspace_root = tmp_path
return tmp_path, config
class TestReadManyFilesTool:
def test_read_multiple_files(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
(ws / "a.txt").write_text("alpha")
(ws / "b.txt").write_text("bravo")
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-1", {"file_paths": ["a.txt", "b.txt"]})
assert result.status == ToolResultStatus.SUCCESS
assert "=== a.txt ===" in result.output
assert "alpha" in result.output
assert "=== b.txt ===" in result.output
assert "bravo" in result.output
def test_partial_failure(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
(ws / "exists.txt").write_text("hello")
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-2", {"file_paths": ["exists.txt", "missing.txt"]})
assert result.status == ToolResultStatus.SUCCESS
assert "hello" in result.output
assert "[ERROR]" in result.output
assert "=== missing.txt ===" in result.output
def test_all_files_fail(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-3", {"file_paths": ["no1.txt", "no2.txt"]})
assert result.status == ToolResultStatus.ERROR
assert "All files failed" in (result.error or "")
def test_empty_file_paths(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-4", {"file_paths": []})
assert result.status == ToolResultStatus.ERROR
assert "empty" in (result.error or "").lower()
def test_path_security_inline_error(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
ws, cfg = tmp_workspace
(ws / "safe.txt").write_text("ok")
tool = ReadManyFilesTool(ws, cfg)
result = tool.run("tc-5", {"file_paths": ["safe.txt", "../../etc/passwd"]})
assert result.status == ToolResultStatus.SUCCESS
assert "ok" in result.output
assert "[ERROR]" in result.output
assert "outside" in result.output.lower()

View File

@@ -90,6 +90,6 @@ class TestRunCommandTool:
# Create a file in the workspace to verify cwd
(ws / "marker.txt").write_text("found")
tool = RunCommandTool(ws, cfg)
result = tool.run("tc-9", {"command": "cat marker.txt"})
result = tool.run("tc-9", {"command": "head marker.txt"})
assert result.status == ToolResultStatus.SUCCESS
assert "found" in result.output

View File

@@ -108,7 +108,7 @@ class TestToolRegistry:
registry = create_default_registry(workspace, config)
names = set(registry.get_all().keys())
assert names == {
"read_file", "list_dir", "grep_files", "find_files",
"read_file", "read_many_files", "list_dir", "grep_files", "find_files",
"write_file", "make_dir", "delete_file",
"str_replace", "patch_apply",
"run_command",
@@ -118,7 +118,7 @@ class TestToolRegistry:
def test_schema_export(self, workspace: Path, config: AppConfig) -> None:
registry = create_default_registry(workspace, config)
schemas = registry.get_openai_tools_schema()
assert len(schemas) == 11
assert len(schemas) == 12
assert all(s["type"] == "function" for s in schemas)