Implement 6 new agent tools — write_file, make_dir, delete_file, str_replace, patch_apply, run_command — bringing the agent from read-only observer to active code modifier. All write/shell operations are gated through the existing permissions service. Also fix a bug where qwen3.5 thinking mode produces reasoning tokens but no content after tool results, causing the agent to silently exit. The loop now detects reasoning-only responses, retries twice, then injects a nudge message to break the model out of its thinking loop. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
146 lines
5.4 KiB
Python
146 lines
5.4 KiB
Python
"""Tests for edit tools: str_replace and patch_apply (Phase 6)."""
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import patch as mock_patch
|
|
|
|
import pytest
|
|
|
|
from app.models.config import AppConfig, load_config
|
|
from app.models.tool_call import ToolResultStatus
|
|
from app.tools.edit import PatchApplyTool, StrReplaceTool
|
|
|
|
|
|
@pytest.fixture
|
|
def config() -> AppConfig:
|
|
return load_config()
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_workspace(tmp_path: Path, config: AppConfig) -> tuple[Path, AppConfig]:
|
|
"""Create a temporary workspace for edit tests."""
|
|
config.agent.workspace_root = tmp_path
|
|
return tmp_path, config
|
|
|
|
|
|
# --- StrReplaceTool ---
|
|
|
|
|
|
class TestStrReplaceTool:
|
|
def test_replace_unique_match(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
(ws / "test.py").write_text("def hello():\n return 'hello'\n")
|
|
tool = StrReplaceTool(ws, cfg)
|
|
result = tool.run(
|
|
"tc-1",
|
|
{"file_path": "test.py", "old_str": "return 'hello'", "new_str": "return 'world'"},
|
|
)
|
|
assert result.status == ToolResultStatus.SUCCESS
|
|
assert "1 occurrence" in result.output
|
|
assert (ws / "test.py").read_text() == "def hello():\n return 'world'\n"
|
|
|
|
def test_replace_no_match(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
(ws / "test.py").write_text("some content")
|
|
tool = StrReplaceTool(ws, cfg)
|
|
result = tool.run(
|
|
"tc-2",
|
|
{"file_path": "test.py", "old_str": "nonexistent", "new_str": "replacement"},
|
|
)
|
|
assert result.status == ToolResultStatus.ERROR
|
|
assert "not found" in (result.error or "").lower()
|
|
|
|
def test_replace_multiple_matches_fails(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
(ws / "test.py").write_text("foo bar foo baz foo")
|
|
tool = StrReplaceTool(ws, cfg)
|
|
result = tool.run(
|
|
"tc-3",
|
|
{"file_path": "test.py", "old_str": "foo", "new_str": "qux"},
|
|
)
|
|
assert result.status == ToolResultStatus.ERROR
|
|
assert "3 times" in (result.error or "")
|
|
|
|
def test_replace_nonexistent_file(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
tool = StrReplaceTool(ws, cfg)
|
|
result = tool.run(
|
|
"tc-4",
|
|
{"file_path": "missing.py", "old_str": "a", "new_str": "b"},
|
|
)
|
|
assert result.status == ToolResultStatus.ERROR
|
|
assert "not found" in (result.error or "").lower()
|
|
|
|
def test_replace_path_traversal_blocked(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
tool = StrReplaceTool(ws, cfg)
|
|
result = tool.run(
|
|
"tc-5",
|
|
{"file_path": "../../etc/passwd", "old_str": "a", "new_str": "b"},
|
|
)
|
|
assert result.status == ToolResultStatus.ERROR
|
|
assert "outside" in (result.error or "").lower()
|
|
|
|
|
|
# --- PatchApplyTool ---
|
|
|
|
|
|
class TestPatchApplyTool:
|
|
def test_apply_valid_patch(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
(ws / "target.txt").write_text("line1\nline2\nline3\n")
|
|
patch_text = (
|
|
"--- a/target.txt\n"
|
|
"+++ b/target.txt\n"
|
|
"@@ -1,3 +1,3 @@\n"
|
|
" line1\n"
|
|
"-line2\n"
|
|
"+line2_modified\n"
|
|
" line3\n"
|
|
)
|
|
tool = PatchApplyTool(ws, cfg)
|
|
result = tool.run("tc-1", {"file_path": "target.txt", "patch": patch_text})
|
|
assert result.status == ToolResultStatus.SUCCESS
|
|
assert "line2_modified" in (ws / "target.txt").read_text()
|
|
|
|
def test_apply_bad_patch(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
(ws / "target.txt").write_text("line1\nline2\n")
|
|
tool = PatchApplyTool(ws, cfg)
|
|
result = tool.run(
|
|
"tc-2",
|
|
{"file_path": "target.txt", "patch": "this is not a valid patch"},
|
|
)
|
|
assert result.status == ToolResultStatus.ERROR
|
|
|
|
def test_apply_nonexistent_file(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
tool = PatchApplyTool(ws, cfg)
|
|
result = tool.run(
|
|
"tc-3",
|
|
{"file_path": "missing.txt", "patch": "--- a\n+++ b\n"},
|
|
)
|
|
assert result.status == ToolResultStatus.ERROR
|
|
assert "not found" in (result.error or "").lower()
|
|
|
|
def test_apply_path_traversal_blocked(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
tool = PatchApplyTool(ws, cfg)
|
|
result = tool.run(
|
|
"tc-4",
|
|
{"file_path": "../../etc/passwd", "patch": "--- a\n+++ b\n"},
|
|
)
|
|
assert result.status == ToolResultStatus.ERROR
|
|
assert "outside" in (result.error or "").lower()
|
|
|
|
def test_apply_timeout(self, tmp_workspace: tuple[Path, AppConfig]) -> None:
|
|
ws, cfg = tmp_workspace
|
|
(ws / "target.txt").write_text("content\n")
|
|
tool = PatchApplyTool(ws, cfg)
|
|
with mock_patch("app.tools.edit.subprocess.run", side_effect=__import__("subprocess").TimeoutExpired("patch", 30)):
|
|
result = tool.run(
|
|
"tc-5",
|
|
{"file_path": "target.txt", "patch": "--- a\n+++ b\n"},
|
|
)
|
|
assert result.status == ToolResultStatus.ERROR
|
|
assert "timed out" in (result.error or "").lower()
|