"""Search tools: grep_files and find_files.""" import subprocess from typing import Any from pydantic import BaseModel, Field from app.models.tool_call import ToolResult, ToolResultStatus from app.tools.base import BaseTool from app.utils.file_helpers import PathSecurityError, resolve_safe_path _GREP_MAX_MATCHES = 100 _FIND_MAX_RESULTS = 200 class GrepFilesParams(BaseModel): """Parameters for the grep_files tool.""" pattern: str = Field(description="Regular expression pattern to search for") path: str = Field(default=".", description="Directory or file to search in (relative to workspace root)") file_pattern: str | None = Field( default=None, description="Glob pattern to filter files (e.g. '*.py')" ) class GrepFilesTool(BaseTool): """Search file contents using grep.""" name = "grep_files" description = ( "Search for a regex pattern in file contents. Returns matching lines with " "file paths and line numbers." ) params_model = GrepFilesParams def execute( self, *, tool_call_id: str, pattern: str, path: str = ".", file_pattern: str | None = None, **kwargs: Any, ) -> ToolResult: try: safe_path = resolve_safe_path(path, self.workspace_root) except PathSecurityError as exc: return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.ERROR, error=str(exc), ) cmd = ["grep", "-rn", pattern, str(safe_path)] if file_pattern: cmd.insert(3, f"--include={file_pattern}") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, ) except subprocess.TimeoutExpired: return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.ERROR, error="grep timed out after 30 seconds", ) if result.returncode == 1: # No matches — not an error return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.SUCCESS, output="No matches found.", ) if result.returncode >= 2: return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.ERROR, error=result.stderr.strip() or f"grep exited with code {result.returncode}", ) # Truncate to max matches lines = result.stdout.splitlines() output_lines = lines[:_GREP_MAX_MATCHES] if len(lines) > _GREP_MAX_MATCHES: output_lines.append(f"\n... truncated ({len(lines)} matches, showing {_GREP_MAX_MATCHES})") return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.SUCCESS, output="\n".join(output_lines), ) class FindFilesParams(BaseModel): """Parameters for the find_files tool.""" pattern: str = Field(description="File name pattern to search for (e.g. '*.py', 'config.yaml')") path: str = Field(default=".", description="Directory to search in (relative to workspace root)") class FindFilesTool(BaseTool): """Find files by name pattern.""" name = "find_files" description = "Search for files matching a name pattern. Returns relative file paths." params_model = FindFilesParams def execute( self, *, tool_call_id: str, pattern: str, path: str = ".", **kwargs: Any, ) -> ToolResult: try: safe_path = resolve_safe_path(path, self.workspace_root) except PathSecurityError as exc: return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.ERROR, error=str(exc), ) cmd = ["find", str(safe_path), "-name", pattern, "-type", "f"] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, ) except subprocess.TimeoutExpired: return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.ERROR, error="find timed out after 30 seconds", ) if result.returncode != 0: return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.ERROR, error=result.stderr.strip() or f"find exited with code {result.returncode}", ) # Make paths relative to workspace root and truncate lines = result.stdout.strip().splitlines() if result.stdout.strip() else [] relative_lines: list[str] = [] for line in lines[:_FIND_MAX_RESULTS]: try: from pathlib import Path rel = Path(line).relative_to(self.workspace_root) relative_lines.append(str(rel)) except ValueError: relative_lines.append(line) if len(lines) > _FIND_MAX_RESULTS: relative_lines.append(f"\n... truncated ({len(lines)} results, showing {_FIND_MAX_RESULTS})") output = "\n".join(relative_lines) if relative_lines else "No files found." return ToolResult( tool_call_id=tool_call_id, tool_name=self.name, status=ToolResultStatus.SUCCESS, output=output, )