feat: add tool-level file cache with LRU eviction and mtime invalidation

Introduces FileCache (OrderedDict LRU, st_mtime_ns validation) to avoid
redundant disk reads and duplicate content in conversation context.
Read tools return a short "[Cached]" message on cache hit instead of
resending unchanged file content, saving tokens. Write/edit/delete tools
invalidate affected paths; str_replace pre-warms the cache after edits.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 22:26:51 -05:00
parent 2c532adbbc
commit d829e6553c
8 changed files with 623 additions and 10 deletions

185
app/utils/file_cache.py Normal file
View File

@@ -0,0 +1,185 @@
"""File cache with LRU eviction and mtime-based invalidation."""
from __future__ import annotations
from collections import OrderedDict
from dataclasses import dataclass, field
from pathlib import Path
from app.utils.file_helpers import (
BinaryFileError,
FileSizeError,
PathSecurityError,
check_file_size,
is_binary_file,
resolve_safe_path,
)
@dataclass(slots=True)
class CacheEntry:
"""A cached file's content and modification timestamp."""
content: str
mtime_ns: int
@dataclass
class CacheStats:
"""Running statistics for a FileCache instance."""
hits: int = 0
misses: int = 0
invalidations: int = 0
evictions: int = 0
@property
def hit_rate(self) -> float:
"""Return cache hit rate as a float between 0.0 and 1.0."""
total = self.hits + self.misses
if total == 0:
return 0.0
return self.hits / total
class FileCache:
"""LRU file-content cache with mtime-based invalidation.
Keyed by resolved absolute ``Path``. Each lookup performs a cheap
``stat()`` syscall to verify the file hasn't changed on disk — if the
nanosecond mtime differs the entry is evicted and the caller gets a
cache miss.
Not thread-safe (single-threaded agent loop).
"""
def __init__(self, max_entries: int = 128) -> None:
self._max_entries = max_entries
self._entries: OrderedDict[Path, CacheEntry] = OrderedDict()
self._stats = CacheStats()
# -- public API --------------------------------------------------
def get(self, path: Path) -> str | None:
"""Return cached content if *path* hasn't changed, else ``None``.
A ``stat()`` call checks ``st_mtime_ns``; on mismatch the stale
entry is silently removed.
"""
entry = self._entries.get(path)
if entry is None:
self._stats.misses += 1
return None
try:
current_mtime_ns = path.stat().st_mtime_ns
except OSError:
# File gone — evict and miss.
self._remove(path)
self._stats.misses += 1
return None
if current_mtime_ns != entry.mtime_ns:
self._remove(path)
self._stats.invalidations += 1
self._stats.misses += 1
return None
# Cache hit — move to end (most-recently used).
self._entries.move_to_end(path)
self._stats.hits += 1
return entry.content
def put(self, path: Path, content: str) -> None:
"""Store *content* for *path* with its current ``st_mtime_ns``.
Evicts the least-recently-used entry when over capacity.
"""
try:
mtime_ns = path.stat().st_mtime_ns
except OSError:
# Can't stat — don't cache.
return
if path in self._entries:
# Update existing; move to end.
self._entries[path] = CacheEntry(content=content, mtime_ns=mtime_ns)
self._entries.move_to_end(path)
else:
self._entries[path] = CacheEntry(content=content, mtime_ns=mtime_ns)
# Evict LRU if over capacity.
while len(self._entries) > self._max_entries:
self._entries.popitem(last=False)
self._stats.evictions += 1
def invalidate(self, path: Path) -> None:
"""Remove *path* from the cache if present."""
if path in self._entries:
del self._entries[path]
self._stats.invalidations += 1
def clear(self) -> None:
"""Remove all entries."""
self._entries.clear()
@property
def stats(self) -> CacheStats:
"""Return the running cache statistics."""
return self._stats
def __len__(self) -> int:
return len(self._entries)
# -- internals ---------------------------------------------------
def _remove(self, path: Path) -> None:
"""Delete an entry without bumping invalidation stats."""
self._entries.pop(path, None)
def cached_read_file(
path: str | Path,
workspace_root: Path,
max_size_bytes: int = 1_048_576,
check_binary: bool = True,
cache: FileCache | None = None,
) -> str:
"""Read a file with full security checks, using *cache* when available.
Security checks (path sandboxing, size limit, binary detection) run on
**every** call — only the ``Path.read_text()`` I/O is skipped on a cache
hit.
When *cache* is ``None`` this behaves identically to
:func:`~app.utils.file_helpers.safe_read_file`.
Raises:
PathSecurityError: If the path escapes the workspace.
FileSizeError: If the file is too large.
BinaryFileError: If the file is binary and *check_binary* is True.
FileNotFoundError: If the file does not exist.
"""
safe_path = resolve_safe_path(path, workspace_root)
if not safe_path.exists():
raise FileNotFoundError(f"File not found: {safe_path}")
check_file_size(safe_path, max_size_bytes)
if check_binary and is_binary_file(safe_path):
raise BinaryFileError(f"File appears to be binary: {safe_path}")
# Try cache.
if cache is not None:
cached = cache.get(safe_path)
if cached is not None:
return cached
# Cache miss (or no cache) — read from disk.
content = safe_path.read_text(encoding="utf-8")
if cache is not None:
cache.put(safe_path, content)
return content