Implement 6 new agent tools — write_file, make_dir, delete_file, str_replace, patch_apply, run_command — bringing the agent from read-only observer to active code modifier. All write/shell operations are gated through the existing permissions service. Also fix a bug where qwen3.5 thinking mode produces reasoning tokens but no content after tool results, causing the agent to silently exit. The loop now detects reasoning-only responses, retries twice, then injects a nudge message to break the model out of its thinking loop. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
209 lines
6.7 KiB
Python
209 lines
6.7 KiB
Python
"""Edit tools: str_replace and patch_apply."""
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.models.tool_call import ToolResult, ToolResultStatus
|
|
from app.tools.base import BaseTool
|
|
from app.utils.file_helpers import (
|
|
FileSizeError,
|
|
PathSecurityError,
|
|
resolve_safe_path,
|
|
safe_read_file,
|
|
safe_write_file,
|
|
)
|
|
|
|
_PATCH_TIMEOUT = 30
|
|
|
|
|
|
class StrReplaceParams(BaseModel):
|
|
"""Parameters for the str_replace tool."""
|
|
|
|
file_path: str = Field(description="Path to the file to edit (relative to workspace root)")
|
|
old_str: str = Field(description="The exact string to find and replace (must be unique in file)")
|
|
new_str: str = Field(description="The replacement string")
|
|
|
|
|
|
class StrReplaceTool(BaseTool):
|
|
"""Replace a unique string occurrence in a file."""
|
|
|
|
name = "str_replace"
|
|
description = (
|
|
"Replace exactly one occurrence of old_str with new_str in a file. "
|
|
"Fails if old_str is not found or appears more than once."
|
|
)
|
|
params_model = StrReplaceParams
|
|
|
|
def execute(
|
|
self, *, tool_call_id: str, file_path: str, old_str: str, new_str: str, **kwargs: Any
|
|
) -> ToolResult:
|
|
fs_config = self.config.tools.filesystem
|
|
|
|
# Read the file
|
|
try:
|
|
content = safe_read_file(
|
|
file_path,
|
|
self.workspace_root,
|
|
max_size_bytes=fs_config.max_file_size_bytes,
|
|
check_binary=fs_config.binary_detection,
|
|
)
|
|
except PathSecurityError as exc:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=str(exc),
|
|
)
|
|
except FileNotFoundError as exc:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=str(exc),
|
|
)
|
|
except FileSizeError as exc:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=str(exc),
|
|
)
|
|
|
|
# Count occurrences
|
|
count = content.count(old_str)
|
|
if count == 0:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=f"old_str not found in {file_path}",
|
|
)
|
|
if count > 1:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=f"old_str appears {count} times in {file_path} (must be unique)",
|
|
)
|
|
|
|
# Perform replacement and write back
|
|
new_content = content.replace(old_str, new_str, 1)
|
|
try:
|
|
safe_write_file(
|
|
file_path,
|
|
new_content,
|
|
self.workspace_root,
|
|
max_size_bytes=fs_config.max_file_size_bytes,
|
|
)
|
|
except PathSecurityError as exc:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=str(exc),
|
|
)
|
|
except FileSizeError as exc:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=str(exc),
|
|
)
|
|
|
|
try:
|
|
safe_path = resolve_safe_path(file_path, self.workspace_root)
|
|
rel_path = safe_path.relative_to(self.workspace_root)
|
|
except (PathSecurityError, ValueError):
|
|
rel_path = Path(file_path)
|
|
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.SUCCESS,
|
|
output=f"Replaced 1 occurrence in {rel_path}",
|
|
)
|
|
|
|
|
|
class PatchApplyParams(BaseModel):
|
|
"""Parameters for the patch_apply tool."""
|
|
|
|
file_path: str = Field(description="Path to the file to patch (relative to workspace root)")
|
|
patch: str = Field(description="Unified diff format patch to apply")
|
|
|
|
|
|
class PatchApplyTool(BaseTool):
|
|
"""Apply a unified diff patch to a file."""
|
|
|
|
name = "patch_apply"
|
|
description = (
|
|
"Apply a unified diff (patch) to a file. The patch must be in standard "
|
|
"unified diff format."
|
|
)
|
|
params_model = PatchApplyParams
|
|
|
|
def execute(self, *, tool_call_id: str, file_path: str, patch: str, **kwargs: Any) -> ToolResult:
|
|
try:
|
|
safe_path = resolve_safe_path(file_path, self.workspace_root)
|
|
except PathSecurityError as exc:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=str(exc),
|
|
)
|
|
|
|
if not safe_path.exists():
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=f"File not found: {safe_path}",
|
|
)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["patch", "--forward", "--no-backup-if-mismatch", str(safe_path)],
|
|
input=patch,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=_PATCH_TIMEOUT,
|
|
cwd=self.workspace_root,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=f"Patch timed out after {_PATCH_TIMEOUT}s",
|
|
)
|
|
except FileNotFoundError:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error="'patch' command not found on system",
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.ERROR,
|
|
error=f"Patch failed (exit {result.returncode}): {result.stderr or result.stdout}",
|
|
)
|
|
|
|
try:
|
|
rel_path = safe_path.relative_to(self.workspace_root)
|
|
except ValueError:
|
|
rel_path = safe_path
|
|
|
|
return ToolResult(
|
|
tool_call_id=tool_call_id,
|
|
tool_name=self.name,
|
|
status=ToolResultStatus.SUCCESS,
|
|
output=f"Patch applied to {rel_path}",
|
|
)
|