Add Phase 6: write tools, shell, and edit tools with reasoning-only fix

Implement 6 new agent tools — write_file, make_dir, delete_file,
str_replace, patch_apply, run_command — bringing the agent from
read-only observer to active code modifier. All write/shell operations
are gated through the existing permissions service.

Also fix a bug where qwen3.5 thinking mode produces reasoning tokens
but no content after tool results, causing the agent to silently exit.
The loop now detects reasoning-only responses, retries twice, then
injects a nudge message to break the model out of its thinking loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 09:45:48 -05:00
parent 0cf0d01657
commit f60c47a85f
12 changed files with 956 additions and 3 deletions

113
app/tools/shell.py Normal file
View File

@@ -0,0 +1,113 @@
"""Shell tool: run_command."""
import shlex
import subprocess
from typing import Any
from pydantic import BaseModel, Field
from app.models.tool_call import ToolResult, ToolResultStatus
from app.tools.base import BaseTool
_DEFAULT_TIMEOUT = 30
class RunCommandParams(BaseModel):
"""Parameters for the run_command tool."""
command: str = Field(description="Shell command to execute")
timeout: int | None = Field(default=None, description="Timeout in seconds (default: 30)")
class RunCommandTool(BaseTool):
"""Execute a shell command within the workspace."""
name = "run_command"
description = (
"Run a shell command in the workspace directory. "
"Only allowed commands may be executed; dangerous commands are blocked."
)
params_model = RunCommandParams
def execute(self, *, tool_call_id: str, command: str, timeout: int | None = None, **kwargs: Any) -> ToolResult:
shell_config = self.config.tools.shell
effective_timeout = timeout if timeout is not None else _DEFAULT_TIMEOUT
# Deny check: prefix match against full command string
for denied in shell_config.denied_commands:
if command.startswith(denied):
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Command denied: matches blocked prefix '{denied}'",
)
# Allow check: first token must be in allowed_commands
try:
tokens = shlex.split(command)
except ValueError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Failed to parse command: {exc}",
)
if not tokens:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error="Empty command",
)
base_cmd = tokens[0]
if shell_config.allowed_commands and base_cmd not in shell_config.allowed_commands:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Command '{base_cmd}' is not in the allowed commands list",
)
# Execute
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=effective_timeout,
cwd=self.workspace_root,
)
except subprocess.TimeoutExpired:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Command timed out after {effective_timeout}s",
)
except OSError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Failed to execute command: {exc}",
)
# Combine output and truncate
output = result.stdout + result.stderr
max_bytes = shell_config.max_output_bytes
if len(output.encode("utf-8", errors="replace")) > max_bytes:
output = output[:max_bytes] + "\n... (output truncated)"
if result.returncode != 0:
output = f"Exit code: {result.returncode}\n{output}"
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=output,
)