Files
SneakyCode/app/tools/filesystem.py
Phillip Tarrant 05754fe06b feat: add read_many_files tool for batch file reading
Reduces LLM round-trips by allowing multiple files to be read in a
single tool call. Uses best-effort error handling so partial failures
still return successful reads.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 21:56:53 -05:00

381 lines
12 KiB
Python

"""Filesystem tools: read_file, list_dir, write_file, make_dir, delete_file."""
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from app.models.tool_call import ToolResult, ToolResultStatus
from app.tools.base import BaseTool
from app.utils.file_helpers import (
BinaryFileError,
FileSizeError,
PathSecurityError,
resolve_safe_path,
safe_read_file,
safe_write_file,
)
class ReadFileParams(BaseModel):
"""Parameters for the read_file tool."""
file_path: str = Field(description="Path to the file to read (relative to workspace root)")
class ReadManyFilesParams(BaseModel):
"""Parameters for the read_many_files tool."""
file_paths: list[str] = Field(description="List of file paths to read (relative to workspace root)")
class ReadFileTool(BaseTool):
"""Read the contents of a file within the workspace."""
name = "read_file"
description = "Read the full contents of a text file. Returns the file content as a string."
params_model = ReadFileParams
def execute(self, *, tool_call_id: str, file_path: str, **kwargs: Any) -> ToolResult:
fs_config = self.config.tools.filesystem
try:
content = safe_read_file(
file_path,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
check_binary=fs_config.binary_detection,
)
except PathSecurityError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
except FileNotFoundError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
except FileSizeError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
except BinaryFileError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=content,
)
class ReadManyFilesTool(BaseTool):
"""Read contents of multiple files at once."""
name = "read_many_files"
description = (
"Read contents of multiple files at once. Returns each file's content "
"prefixed with its path header."
)
params_model = ReadManyFilesParams
def execute(self, *, tool_call_id: str, file_paths: list[str], **kwargs: Any) -> ToolResult:
if not file_paths:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error="file_paths list is empty",
)
fs_config = self.config.tools.filesystem
sections: list[str] = []
success_count = 0
for fp in file_paths:
try:
content = safe_read_file(
fp,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
check_binary=fs_config.binary_detection,
)
sections.append(f"=== {fp} ===\n{content}")
success_count += 1
except (PathSecurityError, FileNotFoundError, FileSizeError, BinaryFileError) as exc:
sections.append(f"=== {fp} ===\n[ERROR] {exc}")
if success_count == 0:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error="All files failed to read:\n" + "\n".join(sections),
)
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output="\n".join(sections),
)
class ListDirParams(BaseModel):
"""Parameters for the list_dir tool."""
directory_path: str = Field(
default=".", description="Path to the directory to list (relative to workspace root)"
)
recursive: bool = Field(default=False, description="If true, list entries recursively")
_MAX_ENTRIES = 500
class ListDirTool(BaseTool):
"""List files and directories within the workspace."""
name = "list_dir"
description = (
"List the contents of a directory. Directories are suffixed with '/'. "
"Results are sorted with directories first, then files."
)
params_model = ListDirParams
def execute(
self, *, tool_call_id: str, directory_path: str = ".", recursive: bool = False, **kwargs: Any
) -> ToolResult:
try:
safe_path = resolve_safe_path(directory_path, self.workspace_root)
except PathSecurityError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
if not safe_path.is_dir():
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Not a directory: {safe_path}",
)
entries: list[Path] = []
if recursive:
entries = list(safe_path.rglob("*"))
else:
entries = list(safe_path.iterdir())
# Sort: directories first, then files, alphabetical within each group
dirs = sorted([e for e in entries if e.is_dir()])
files = sorted([e for e in entries if e.is_file()])
sorted_entries = dirs + files
lines: list[str] = []
for entry in sorted_entries[:_MAX_ENTRIES]:
try:
rel = entry.relative_to(self.workspace_root)
except ValueError:
rel = entry
suffix = "/" if entry.is_dir() else ""
lines.append(f"{rel}{suffix}")
if len(sorted_entries) > _MAX_ENTRIES:
lines.append(f"\n... truncated ({len(sorted_entries)} total entries, showing {_MAX_ENTRIES})")
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output="\n".join(lines),
)
class WriteFileParams(BaseModel):
"""Parameters for the write_file tool."""
file_path: str = Field(description="Path to the file to write (relative to workspace root)")
content: str = Field(description="Content to write to the file")
class WriteFileTool(BaseTool):
"""Write content to a file within the workspace."""
name = "write_file"
description = (
"Write text content to a file. Creates parent directories if needed. "
"Overwrites existing file content."
)
params_model = WriteFileParams
def execute(self, *, tool_call_id: str, file_path: str, content: str, **kwargs: Any) -> ToolResult:
fs_config = self.config.tools.filesystem
try:
safe_path = safe_write_file(
file_path,
content,
self.workspace_root,
max_size_bytes=fs_config.max_file_size_bytes,
)
except PathSecurityError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
except FileSizeError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:
rel_path = safe_path
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=f"Wrote {len(content)} characters to {rel_path}",
)
class MakeDirParams(BaseModel):
"""Parameters for the make_dir tool."""
directory_path: str = Field(description="Path to the directory to create (relative to workspace root)")
class MakeDirTool(BaseTool):
"""Create a directory (and any missing parents) within the workspace."""
name = "make_dir"
description = "Create a directory and any necessary parent directories."
params_model = MakeDirParams
def execute(self, *, tool_call_id: str, directory_path: str, **kwargs: Any) -> ToolResult:
try:
safe_path = resolve_safe_path(directory_path, self.workspace_root)
except PathSecurityError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
if safe_path.exists() and not safe_path.is_dir():
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Path exists and is not a directory: {safe_path}",
)
try:
safe_path.mkdir(parents=True, exist_ok=True)
except OSError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Failed to create directory: {exc}",
)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:
rel_path = safe_path
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=f"Created directory: {rel_path}",
)
class DeleteFileParams(BaseModel):
"""Parameters for the delete_file tool."""
file_path: str = Field(description="Path to the file to delete (relative to workspace root)")
class DeleteFileTool(BaseTool):
"""Delete a file within the workspace. Refuses to delete directories."""
name = "delete_file"
description = "Delete a single file. Does not delete directories."
params_model = DeleteFileParams
def execute(self, *, tool_call_id: str, file_path: str, **kwargs: Any) -> ToolResult:
try:
safe_path = resolve_safe_path(file_path, self.workspace_root)
except PathSecurityError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=str(exc),
)
if safe_path.is_dir():
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Path is a directory, not a file: {safe_path}",
)
if not safe_path.exists():
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"File not found: {safe_path}",
)
try:
safe_path.unlink()
except OSError as exc:
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.ERROR,
error=f"Failed to delete file: {exc}",
)
try:
rel_path = safe_path.relative_to(self.workspace_root)
except ValueError:
rel_path = safe_path
return ToolResult(
tool_call_id=tool_call_id,
tool_name=self.name,
status=ToolResultStatus.SUCCESS,
output=f"Deleted: {rel_path}",
)