feat: Add auto-closing files via LRU metric (#2881)
This commit is contained in:
@@ -338,3 +338,6 @@ REDIS_INCLUDE = "include"
|
||||
REDIS_EXCLUDE = "exclude"
|
||||
REDIS_SET_DEFAULT_VAL = "None"
|
||||
REDIS_DEFAULT_CACHE_PREFIX = "letta_cache"
|
||||
|
||||
# TODO: This is temporary, eventually use token-based eviction
|
||||
MAX_FILES_OPEN = 5
|
||||
|
||||
@@ -1370,13 +1370,17 @@ class SyncServer(Server):
|
||||
)
|
||||
await self.agent_manager.delete_agent_async(agent_id=sleeptime_agent_state.id, actor=actor)
|
||||
|
||||
async def _upsert_file_to_agent(self, agent_id: str, text: str, file_id: str, file_name: str, actor: User) -> None:
|
||||
async def _upsert_file_to_agent(self, agent_id: str, text: str, file_id: str, file_name: str, actor: User) -> List[str]:
|
||||
"""
|
||||
Internal method to create or update a file <-> agent association
|
||||
|
||||
Returns:
|
||||
List of file names that were closed due to LRU eviction
|
||||
"""
|
||||
await self.file_agent_manager.attach_file(
|
||||
file_agent, closed_files = await self.file_agent_manager.attach_file(
|
||||
agent_id=agent_id, file_id=file_id, file_name=file_name, actor=actor, visible_content=text
|
||||
)
|
||||
return closed_files
|
||||
|
||||
async def _remove_file_from_agent(self, agent_id: str, file_id: str, actor: User) -> None:
|
||||
"""
|
||||
@@ -1407,7 +1411,14 @@ class SyncServer(Server):
|
||||
logger.info(f"Inserting document into context window for source: {source_id}")
|
||||
logger.info(f"Attached agents: {[a.id for a in agent_states]}")
|
||||
|
||||
await asyncio.gather(*(self._upsert_file_to_agent(agent_state.id, text, file_id, file_name, actor) for agent_state in agent_states))
|
||||
# Collect any files that were closed due to LRU eviction during bulk attach
|
||||
all_closed_files = await asyncio.gather(
|
||||
*(self._upsert_file_to_agent(agent_state.id, text, file_id, file_name, actor) for agent_state in agent_states)
|
||||
)
|
||||
# Flatten and log if any files were closed
|
||||
closed_files = [file for closed_list in all_closed_files for file in closed_list]
|
||||
if closed_files:
|
||||
logger.info(f"LRU eviction closed {len(closed_files)} files during bulk attach: {closed_files}")
|
||||
|
||||
return agent_states
|
||||
|
||||
@@ -1423,12 +1434,17 @@ class SyncServer(Server):
|
||||
if len(texts) != len(file_ids):
|
||||
raise ValueError(f"Mismatch between number of texts ({len(texts)}) and file ids ({len(file_ids)})")
|
||||
|
||||
await asyncio.gather(
|
||||
# Collect any files that were closed due to LRU eviction during bulk insert
|
||||
all_closed_files = await asyncio.gather(
|
||||
*(
|
||||
self._upsert_file_to_agent(agent_state.id, text, file_id, file_name, actor)
|
||||
for text, file_id, file_name in zip(texts, file_ids, file_names)
|
||||
)
|
||||
)
|
||||
# Flatten and log if any files were closed
|
||||
closed_files = [file for closed_list in all_closed_files for file in closed_list]
|
||||
if closed_files:
|
||||
logger.info(f"LRU eviction closed {len(closed_files)} files during bulk insert: {closed_files}")
|
||||
|
||||
async def remove_file_from_context_windows(self, source_id: str, file_id: str, actor: User) -> None:
|
||||
"""
|
||||
|
||||
@@ -3,6 +3,7 @@ from typing import List, Optional
|
||||
|
||||
from sqlalchemy import and_, func, select, update
|
||||
|
||||
from letta.constants import MAX_FILES_OPEN
|
||||
from letta.orm.errors import NoResultFound
|
||||
from letta.orm.files_agents import FileAgent as FileAgentModel
|
||||
from letta.otel.tracing import trace_method
|
||||
@@ -27,51 +28,66 @@ class FileAgentManager:
|
||||
actor: PydanticUser,
|
||||
is_open: bool = True,
|
||||
visible_content: Optional[str] = None,
|
||||
) -> PydanticFileAgent:
|
||||
) -> tuple[PydanticFileAgent, List[str]]:
|
||||
"""
|
||||
Idempotently attach *file_id* to *agent_id*.
|
||||
Idempotently attach *file_id* to *agent_id* with LRU enforcement.
|
||||
|
||||
• If the row already exists → update `is_open`, `visible_content`
|
||||
and always refresh `last_accessed_at`.
|
||||
• Otherwise create a brand-new association.
|
||||
• If is_open=True, enforces MAX_FILES_OPEN using LRU eviction.
|
||||
|
||||
Returns:
|
||||
Tuple of (file_agent, closed_file_names)
|
||||
"""
|
||||
async with db_registry.async_session() as session:
|
||||
query = select(FileAgentModel).where(
|
||||
and_(
|
||||
FileAgentModel.agent_id == agent_id,
|
||||
FileAgentModel.file_id == file_id,
|
||||
FileAgentModel.file_name == file_name,
|
||||
FileAgentModel.organization_id == actor.organization_id,
|
||||
if is_open:
|
||||
# Use the efficient LRU + open method
|
||||
closed_files, was_already_open = await self.enforce_max_open_files_and_open(
|
||||
agent_id=agent_id, file_id=file_id, file_name=file_name, actor=actor, visible_content=visible_content or ""
|
||||
)
|
||||
|
||||
# Get the updated file agent to return
|
||||
file_agent = await self.get_file_agent_by_id(agent_id=agent_id, file_id=file_id, actor=actor)
|
||||
return file_agent, closed_files
|
||||
else:
|
||||
# Original logic for is_open=False
|
||||
async with db_registry.async_session() as session:
|
||||
query = select(FileAgentModel).where(
|
||||
and_(
|
||||
FileAgentModel.agent_id == agent_id,
|
||||
FileAgentModel.file_id == file_id,
|
||||
FileAgentModel.file_name == file_name,
|
||||
FileAgentModel.organization_id == actor.organization_id,
|
||||
)
|
||||
)
|
||||
)
|
||||
existing = await session.scalar(query)
|
||||
existing = await session.scalar(query)
|
||||
|
||||
now_ts = datetime.now(timezone.utc)
|
||||
now_ts = datetime.now(timezone.utc)
|
||||
|
||||
if existing:
|
||||
# update only the fields that actually changed
|
||||
if existing.is_open != is_open:
|
||||
existing.is_open = is_open
|
||||
if existing:
|
||||
# update only the fields that actually changed
|
||||
if existing.is_open != is_open:
|
||||
existing.is_open = is_open
|
||||
|
||||
if visible_content is not None and existing.visible_content != visible_content:
|
||||
existing.visible_content = visible_content
|
||||
if visible_content is not None and existing.visible_content != visible_content:
|
||||
existing.visible_content = visible_content
|
||||
|
||||
existing.last_accessed_at = now_ts
|
||||
existing.last_accessed_at = now_ts
|
||||
|
||||
await existing.update_async(session, actor=actor)
|
||||
return existing.to_pydantic()
|
||||
await existing.update_async(session, actor=actor)
|
||||
return existing.to_pydantic(), []
|
||||
|
||||
assoc = FileAgentModel(
|
||||
agent_id=agent_id,
|
||||
file_id=file_id,
|
||||
file_name=file_name,
|
||||
organization_id=actor.organization_id,
|
||||
is_open=is_open,
|
||||
visible_content=visible_content,
|
||||
last_accessed_at=now_ts,
|
||||
)
|
||||
await assoc.create_async(session, actor=actor)
|
||||
return assoc.to_pydantic()
|
||||
assoc = FileAgentModel(
|
||||
agent_id=agent_id,
|
||||
file_id=file_id,
|
||||
file_name=file_name,
|
||||
organization_id=actor.organization_id,
|
||||
is_open=is_open,
|
||||
visible_content=visible_content,
|
||||
last_accessed_at=now_ts,
|
||||
)
|
||||
await assoc.create_async(session, actor=actor)
|
||||
return assoc.to_pydantic(), []
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
@@ -246,6 +262,129 @@ class FileAgentManager:
|
||||
await session.execute(stmt)
|
||||
await session.commit()
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
async def mark_access_bulk(self, *, agent_id: str, file_names: List[str], actor: PydanticUser) -> None:
|
||||
"""Update `last_accessed_at = now()` for multiple files by name without loading rows."""
|
||||
if not file_names:
|
||||
return
|
||||
|
||||
async with db_registry.async_session() as session:
|
||||
stmt = (
|
||||
update(FileAgentModel)
|
||||
.where(
|
||||
FileAgentModel.agent_id == agent_id,
|
||||
FileAgentModel.file_name.in_(file_names),
|
||||
FileAgentModel.organization_id == actor.organization_id,
|
||||
)
|
||||
.values(last_accessed_at=func.now())
|
||||
)
|
||||
await session.execute(stmt)
|
||||
await session.commit()
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
async def enforce_max_open_files_and_open(
|
||||
self, *, agent_id: str, file_id: str, file_name: str, actor: PydanticUser, visible_content: str
|
||||
) -> tuple[List[str], bool]:
|
||||
"""
|
||||
Efficiently handle LRU eviction and file opening in a single transaction.
|
||||
|
||||
Args:
|
||||
agent_id: ID of the agent
|
||||
file_id: ID of the file to open
|
||||
file_name: Name of the file to open
|
||||
actor: User performing the action
|
||||
visible_content: Content to set for the opened file
|
||||
|
||||
Returns:
|
||||
Tuple of (closed_file_names, file_was_already_open)
|
||||
"""
|
||||
async with db_registry.async_session() as session:
|
||||
# Single query to get ALL open files for this agent, ordered by last_accessed_at (oldest first)
|
||||
open_files_query = (
|
||||
select(FileAgentModel)
|
||||
.where(
|
||||
and_(
|
||||
FileAgentModel.agent_id == agent_id,
|
||||
FileAgentModel.organization_id == actor.organization_id,
|
||||
FileAgentModel.is_open.is_(True),
|
||||
)
|
||||
)
|
||||
.order_by(FileAgentModel.last_accessed_at.asc()) # Oldest first for LRU
|
||||
)
|
||||
|
||||
all_open_files = (await session.execute(open_files_query)).scalars().all()
|
||||
|
||||
# Check if the target file exists (open or closed)
|
||||
target_file_query = select(FileAgentModel).where(
|
||||
and_(
|
||||
FileAgentModel.agent_id == agent_id,
|
||||
FileAgentModel.organization_id == actor.organization_id,
|
||||
FileAgentModel.file_name == file_name,
|
||||
)
|
||||
)
|
||||
file_to_open = await session.scalar(target_file_query)
|
||||
|
||||
# Separate the file we're opening from others (only if it's currently open)
|
||||
other_open_files = []
|
||||
for file_agent in all_open_files:
|
||||
if file_agent.file_name != file_name:
|
||||
other_open_files.append(file_agent)
|
||||
|
||||
file_was_already_open = file_to_open is not None and file_to_open.is_open
|
||||
|
||||
# Calculate how many files need to be closed
|
||||
current_other_count = len(other_open_files)
|
||||
target_other_count = MAX_FILES_OPEN - 1 # Reserve 1 slot for file we're opening
|
||||
|
||||
closed_file_names = []
|
||||
if current_other_count > target_other_count:
|
||||
files_to_close_count = current_other_count - target_other_count
|
||||
files_to_close = other_open_files[:files_to_close_count] # Take oldest
|
||||
|
||||
# Bulk close files using a single UPDATE query
|
||||
file_ids_to_close = [f.file_id for f in files_to_close]
|
||||
closed_file_names = [f.file_name for f in files_to_close]
|
||||
|
||||
if file_ids_to_close:
|
||||
close_stmt = (
|
||||
update(FileAgentModel)
|
||||
.where(
|
||||
and_(
|
||||
FileAgentModel.agent_id == agent_id,
|
||||
FileAgentModel.file_id.in_(file_ids_to_close),
|
||||
FileAgentModel.organization_id == actor.organization_id,
|
||||
)
|
||||
)
|
||||
.values(is_open=False, visible_content=None)
|
||||
)
|
||||
await session.execute(close_stmt)
|
||||
|
||||
# Open the target file (update or create)
|
||||
now_ts = datetime.now(timezone.utc)
|
||||
|
||||
if file_to_open:
|
||||
# Update existing file
|
||||
file_to_open.is_open = True
|
||||
file_to_open.visible_content = visible_content
|
||||
file_to_open.last_accessed_at = now_ts
|
||||
await file_to_open.update_async(session, actor=actor)
|
||||
else:
|
||||
# Create new file association
|
||||
new_file_agent = FileAgentModel(
|
||||
agent_id=agent_id,
|
||||
file_id=file_id,
|
||||
file_name=file_name,
|
||||
organization_id=actor.organization_id,
|
||||
is_open=True,
|
||||
visible_content=visible_content,
|
||||
last_accessed_at=now_ts,
|
||||
)
|
||||
await new_file_agent.create_async(session, actor=actor)
|
||||
|
||||
return closed_file_names, file_was_already_open
|
||||
|
||||
async def _get_association_by_file_id(self, session, agent_id: str, file_id: str, actor: PydanticUser) -> FileAgentModel:
|
||||
q = select(FileAgentModel).where(
|
||||
and_(
|
||||
|
||||
@@ -127,10 +127,18 @@ class LettaFileToolExecutor(ToolExecutor):
|
||||
content_lines = LineChunker().chunk_text(text=file.content, file_metadata=file, start=start, end=end)
|
||||
visible_content = "\n".join(content_lines)
|
||||
|
||||
await self.files_agents_manager.update_file_agent_by_id(
|
||||
agent_id=agent_state.id, file_id=file_id, actor=self.actor, is_open=True, visible_content=visible_content
|
||||
# Efficiently handle LRU eviction and file opening in a single transaction
|
||||
closed_files, was_already_open = await self.files_agents_manager.enforce_max_open_files_and_open(
|
||||
agent_id=agent_state.id, file_id=file_id, file_name=file_name, actor=self.actor, visible_content=visible_content
|
||||
)
|
||||
return f"Successfully opened file {file_name}, lines {start} to {end} are now visible in memory block <{file_name}>"
|
||||
|
||||
success_msg = f"Successfully opened file {file_name}, lines {start} to {end} are now visible in memory block <{file_name}>"
|
||||
if closed_files:
|
||||
success_msg += (
|
||||
f"\nNote: Closed {len(closed_files)} least recently used file(s) due to open file limit: {', '.join(closed_files)}"
|
||||
)
|
||||
|
||||
return success_msg
|
||||
|
||||
@trace_method
|
||||
async def close_file(self, agent_state: AgentState, file_name: str) -> str:
|
||||
@@ -256,10 +264,11 @@ class LettaFileToolExecutor(ToolExecutor):
|
||||
total_content_size = 0
|
||||
files_processed = 0
|
||||
files_skipped = 0
|
||||
files_with_matches = set() # Track files that had matches for LRU policy
|
||||
|
||||
# Use asyncio timeout to prevent hanging
|
||||
async def _search_files():
|
||||
nonlocal results, total_matches, total_content_size, files_processed, files_skipped
|
||||
nonlocal results, total_matches, total_content_size, files_processed, files_skipped, files_with_matches
|
||||
|
||||
for file_agent in file_agents:
|
||||
# Load file content
|
||||
@@ -336,6 +345,8 @@ class LettaFileToolExecutor(ToolExecutor):
|
||||
continue
|
||||
|
||||
if pattern_regex.search(line_content):
|
||||
# Mark this file as having matches for LRU tracking
|
||||
files_with_matches.add(file.file_name)
|
||||
context = self._get_context_lines(formatted_lines, match_line_num=line_num, context_lines=context_lines or 0)
|
||||
|
||||
# Format the match result
|
||||
@@ -353,6 +364,10 @@ class LettaFileToolExecutor(ToolExecutor):
|
||||
# Execute with timeout
|
||||
await asyncio.wait_for(_search_files(), timeout=self.GREP_TIMEOUT_SECONDS)
|
||||
|
||||
# Mark access for files that had matches
|
||||
if files_with_matches:
|
||||
await self.files_agents_manager.mark_access_bulk(agent_id=agent_state.id, file_names=list(files_with_matches), actor=self.actor)
|
||||
|
||||
# Format final results
|
||||
if not results or total_matches == 0:
|
||||
summary = f"No matches found for pattern: '{pattern}'"
|
||||
@@ -443,6 +458,12 @@ class LettaFileToolExecutor(ToolExecutor):
|
||||
passage_content = "\n".join(formatted_lines)
|
||||
results.append(f"{passage_header}\n{passage_content}")
|
||||
|
||||
# Mark access for files that had matches
|
||||
if files_with_passages:
|
||||
matched_file_names = [name for name in files_with_passages.keys() if name != "Unknown File"]
|
||||
if matched_file_names:
|
||||
await self.files_agents_manager.mark_access_bulk(agent_id=agent_state.id, file_names=matched_file_names, actor=self.actor)
|
||||
|
||||
# Create summary header
|
||||
file_count = len(files_with_passages)
|
||||
summary = f"Found {total_passages} semantic matches in {file_count} file{'s' if file_count != 1 else ''} for query: '{query}'"
|
||||
|
||||
@@ -668,7 +668,7 @@ def event_loop(request):
|
||||
|
||||
@pytest.fixture
|
||||
async def file_attachment(server, default_user, sarah_agent, default_file):
|
||||
assoc = await server.file_agent_manager.attach_file(
|
||||
assoc, closed_files = await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=default_file.id,
|
||||
file_name=default_file.file_name,
|
||||
@@ -736,7 +736,7 @@ async def test_get_context_window_basic(
|
||||
created_agent, create_agent_request = comprehensive_test_agent_fixture
|
||||
|
||||
# Attach a file
|
||||
assoc = await server.file_agent_manager.attach_file(
|
||||
assoc, closed_files = await server.file_agent_manager.attach_file(
|
||||
agent_id=created_agent.id,
|
||||
file_id=default_file.id,
|
||||
file_name=default_file.file_name,
|
||||
@@ -6798,7 +6798,7 @@ async def test_create_mcp_server(server, default_user, event_loop):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attach_creates_association(server, default_user, sarah_agent, default_file):
|
||||
assoc = await server.file_agent_manager.attach_file(
|
||||
assoc, closed_files = await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=default_file.id,
|
||||
file_name=default_file.file_name,
|
||||
@@ -6820,7 +6820,7 @@ async def test_attach_creates_association(server, default_user, sarah_agent, def
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attach_is_idempotent(server, default_user, sarah_agent, default_file):
|
||||
a1 = await server.file_agent_manager.attach_file(
|
||||
a1, closed_files = await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=default_file.id,
|
||||
file_name=default_file.file_name,
|
||||
@@ -6829,7 +6829,7 @@ async def test_attach_is_idempotent(server, default_user, sarah_agent, default_f
|
||||
)
|
||||
|
||||
# second attach with different params
|
||||
a2 = await server.file_agent_manager.attach_file(
|
||||
a2, closed_files = await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=default_file.id,
|
||||
file_name=default_file.file_name,
|
||||
@@ -6971,3 +6971,287 @@ async def test_org_scoping(
|
||||
# other org should see nothing
|
||||
files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=other_user_different_org)
|
||||
assert files == []
|
||||
|
||||
|
||||
# ======================================================================================================================
|
||||
# LRU File Management Tests
|
||||
# ======================================================================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mark_access_bulk(server, default_user, sarah_agent, default_source):
|
||||
"""Test that mark_access_bulk updates last_accessed_at for multiple files."""
|
||||
import time
|
||||
|
||||
# Create multiple files and attach them
|
||||
files = []
|
||||
for i in range(3):
|
||||
file_metadata = PydanticFileMetadata(
|
||||
file_name=f"test_file_{i}.txt",
|
||||
organization_id=default_user.organization_id,
|
||||
source_id=default_source.id,
|
||||
)
|
||||
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"test content {i}")
|
||||
files.append(file)
|
||||
|
||||
# Attach all files (they'll be open by default)
|
||||
attached_files = []
|
||||
for file in files:
|
||||
file_agent, closed_files = await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=file.id,
|
||||
file_name=file.file_name,
|
||||
actor=default_user,
|
||||
visible_content=f"content for {file.file_name}",
|
||||
)
|
||||
attached_files.append(file_agent)
|
||||
|
||||
# Get initial timestamps
|
||||
initial_times = {}
|
||||
for file_agent in attached_files:
|
||||
fa = await server.file_agent_manager.get_file_agent_by_id(agent_id=sarah_agent.id, file_id=file_agent.file_id, actor=default_user)
|
||||
initial_times[fa.file_name] = fa.last_accessed_at
|
||||
|
||||
# Wait a moment to ensure timestamp difference
|
||||
time.sleep(1.1)
|
||||
|
||||
# Use mark_access_bulk on subset of files
|
||||
file_names_to_mark = [files[0].file_name, files[2].file_name]
|
||||
await server.file_agent_manager.mark_access_bulk(agent_id=sarah_agent.id, file_names=file_names_to_mark, actor=default_user)
|
||||
|
||||
# Check that only marked files have updated timestamps
|
||||
for i, file in enumerate(files):
|
||||
fa = await server.file_agent_manager.get_file_agent_by_id(agent_id=sarah_agent.id, file_id=file.id, actor=default_user)
|
||||
|
||||
if file.file_name in file_names_to_mark:
|
||||
assert fa.last_accessed_at > initial_times[file.file_name], f"File {file.file_name} should have updated timestamp"
|
||||
else:
|
||||
assert fa.last_accessed_at == initial_times[file.file_name], f"File {file.file_name} should not have updated timestamp"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lru_eviction_on_attach(server, default_user, sarah_agent, default_source):
|
||||
"""Test that attaching files beyond MAX_FILES_OPEN triggers LRU eviction."""
|
||||
import time
|
||||
|
||||
from letta.constants import MAX_FILES_OPEN
|
||||
|
||||
# Create more files than the limit
|
||||
files = []
|
||||
for i in range(MAX_FILES_OPEN + 2): # 7 files for MAX_FILES_OPEN=5
|
||||
file_metadata = PydanticFileMetadata(
|
||||
file_name=f"lru_test_file_{i}.txt",
|
||||
organization_id=default_user.organization_id,
|
||||
source_id=default_source.id,
|
||||
)
|
||||
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"test content {i}")
|
||||
files.append(file)
|
||||
|
||||
# Attach files one by one with small delays to ensure different timestamps
|
||||
attached_files = []
|
||||
all_closed_files = []
|
||||
|
||||
for i, file in enumerate(files):
|
||||
if i > 0:
|
||||
time.sleep(0.1) # Small delay to ensure different timestamps
|
||||
|
||||
file_agent, closed_files = await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=file.id,
|
||||
file_name=file.file_name,
|
||||
actor=default_user,
|
||||
visible_content=f"content for {file.file_name}",
|
||||
)
|
||||
attached_files.append(file_agent)
|
||||
all_closed_files.extend(closed_files)
|
||||
|
||||
# Check that we never exceed MAX_FILES_OPEN
|
||||
open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
|
||||
assert len(open_files) <= MAX_FILES_OPEN, f"Should never exceed {MAX_FILES_OPEN} open files"
|
||||
|
||||
# Should have closed exactly 2 files (7 - 5 = 2)
|
||||
assert len(all_closed_files) == 2, f"Should have closed 2 files, but closed: {all_closed_files}"
|
||||
|
||||
# Check that the oldest files were closed (first 2 files attached)
|
||||
expected_closed = [files[0].file_name, files[1].file_name]
|
||||
assert set(all_closed_files) == set(expected_closed), f"Wrong files closed. Expected {expected_closed}, got {all_closed_files}"
|
||||
|
||||
# Check that exactly MAX_FILES_OPEN files are open
|
||||
open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
|
||||
assert len(open_files) == MAX_FILES_OPEN
|
||||
|
||||
# Check that the most recently attached files are still open
|
||||
open_file_names = {f.file_name for f in open_files}
|
||||
expected_open = {files[i].file_name for i in range(2, MAX_FILES_OPEN + 2)} # files 2-6
|
||||
assert open_file_names == expected_open
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lru_eviction_on_open_file(server, default_user, sarah_agent, default_source):
|
||||
"""Test that opening a file beyond MAX_FILES_OPEN triggers LRU eviction."""
|
||||
import time
|
||||
|
||||
from letta.constants import MAX_FILES_OPEN
|
||||
|
||||
# Create files equal to the limit
|
||||
files = []
|
||||
for i in range(MAX_FILES_OPEN + 1): # 6 files for MAX_FILES_OPEN=5
|
||||
file_metadata = PydanticFileMetadata(
|
||||
file_name=f"open_test_file_{i}.txt",
|
||||
organization_id=default_user.organization_id,
|
||||
source_id=default_source.id,
|
||||
)
|
||||
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"test content {i}")
|
||||
files.append(file)
|
||||
|
||||
# Attach first MAX_FILES_OPEN files
|
||||
for i in range(MAX_FILES_OPEN):
|
||||
time.sleep(0.1) # Small delay for different timestamps
|
||||
await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=files[i].id,
|
||||
file_name=files[i].file_name,
|
||||
actor=default_user,
|
||||
visible_content=f"content for {files[i].file_name}",
|
||||
)
|
||||
|
||||
# Attach the last file as closed
|
||||
await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=files[-1].id,
|
||||
file_name=files[-1].file_name,
|
||||
actor=default_user,
|
||||
is_open=False,
|
||||
visible_content=f"content for {files[-1].file_name}",
|
||||
)
|
||||
|
||||
# All files should be attached but only MAX_FILES_OPEN should be open
|
||||
all_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user)
|
||||
open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
|
||||
assert len(all_files) == MAX_FILES_OPEN + 1
|
||||
assert len(open_files) == MAX_FILES_OPEN
|
||||
|
||||
# Wait a moment
|
||||
time.sleep(0.1)
|
||||
|
||||
# Now "open" the last file using the efficient method
|
||||
closed_files, was_already_open = await server.file_agent_manager.enforce_max_open_files_and_open(
|
||||
agent_id=sarah_agent.id, file_id=files[-1].id, file_name=files[-1].file_name, actor=default_user, visible_content="updated content"
|
||||
)
|
||||
|
||||
# Should have closed 1 file (the oldest one)
|
||||
assert len(closed_files) == 1, f"Should have closed 1 file, got: {closed_files}"
|
||||
assert closed_files[0] == files[0].file_name, f"Should have closed oldest file {files[0].file_name}"
|
||||
|
||||
# Check that exactly MAX_FILES_OPEN files are still open
|
||||
open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
|
||||
assert len(open_files) == MAX_FILES_OPEN
|
||||
|
||||
# Check that the newly opened file is open and the oldest is closed
|
||||
last_file_agent = await server.file_agent_manager.get_file_agent_by_id(
|
||||
agent_id=sarah_agent.id, file_id=files[-1].id, actor=default_user
|
||||
)
|
||||
first_file_agent = await server.file_agent_manager.get_file_agent_by_id(
|
||||
agent_id=sarah_agent.id, file_id=files[0].id, actor=default_user
|
||||
)
|
||||
|
||||
assert last_file_agent.is_open is True, "Last file should be open"
|
||||
assert first_file_agent.is_open is False, "First file should be closed"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lru_no_eviction_when_reopening_same_file(server, default_user, sarah_agent, default_source):
|
||||
"""Test that reopening an already open file doesn't trigger unnecessary eviction."""
|
||||
import time
|
||||
|
||||
from letta.constants import MAX_FILES_OPEN
|
||||
|
||||
# Create files equal to the limit
|
||||
files = []
|
||||
for i in range(MAX_FILES_OPEN):
|
||||
file_metadata = PydanticFileMetadata(
|
||||
file_name=f"reopen_test_file_{i}.txt",
|
||||
organization_id=default_user.organization_id,
|
||||
source_id=default_source.id,
|
||||
)
|
||||
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"test content {i}")
|
||||
files.append(file)
|
||||
|
||||
# Attach all files (they'll be open)
|
||||
for i, file in enumerate(files):
|
||||
time.sleep(0.1) # Small delay for different timestamps
|
||||
await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id,
|
||||
file_id=file.id,
|
||||
file_name=file.file_name,
|
||||
actor=default_user,
|
||||
visible_content=f"content for {file.file_name}",
|
||||
)
|
||||
|
||||
# All files should be open
|
||||
open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
|
||||
assert len(open_files) == MAX_FILES_OPEN
|
||||
initial_open_names = {f.file_name for f in open_files}
|
||||
|
||||
# Wait a moment
|
||||
time.sleep(0.1)
|
||||
|
||||
# "Reopen" the last file (which is already open)
|
||||
closed_files, was_already_open = await server.file_agent_manager.enforce_max_open_files_and_open(
|
||||
agent_id=sarah_agent.id, file_id=files[-1].id, file_name=files[-1].file_name, actor=default_user, visible_content="updated content"
|
||||
)
|
||||
|
||||
# Should not have closed any files since we're within the limit
|
||||
assert len(closed_files) == 0, f"Should not have closed any files when reopening, got: {closed_files}"
|
||||
assert was_already_open is True, "File should have been detected as already open"
|
||||
|
||||
# All the same files should still be open
|
||||
open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
|
||||
assert len(open_files) == MAX_FILES_OPEN
|
||||
final_open_names = {f.file_name for f in open_files}
|
||||
assert initial_open_names == final_open_names, "Same files should remain open"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_last_accessed_at_updates_correctly(server, default_user, sarah_agent, default_source):
|
||||
"""Test that last_accessed_at is updated in the correct scenarios."""
|
||||
import time
|
||||
|
||||
# Create and attach a file
|
||||
file_metadata = PydanticFileMetadata(
|
||||
file_name="timestamp_test.txt",
|
||||
organization_id=default_user.organization_id,
|
||||
source_id=default_source.id,
|
||||
)
|
||||
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text="test content")
|
||||
|
||||
file_agent, closed_files = await server.file_agent_manager.attach_file(
|
||||
agent_id=sarah_agent.id, file_id=file.id, file_name=file.file_name, actor=default_user, visible_content="initial content"
|
||||
)
|
||||
|
||||
initial_time = file_agent.last_accessed_at
|
||||
time.sleep(1.1)
|
||||
|
||||
# Test update_file_agent_by_id updates timestamp
|
||||
updated_agent = await server.file_agent_manager.update_file_agent_by_id(
|
||||
agent_id=sarah_agent.id, file_id=file.id, actor=default_user, visible_content="updated content"
|
||||
)
|
||||
assert updated_agent.last_accessed_at > initial_time, "update_file_agent_by_id should update timestamp"
|
||||
|
||||
time.sleep(1.1)
|
||||
prev_time = updated_agent.last_accessed_at
|
||||
|
||||
# Test update_file_agent_by_name updates timestamp
|
||||
updated_agent2 = await server.file_agent_manager.update_file_agent_by_name(
|
||||
agent_id=sarah_agent.id, file_name=file.file_name, actor=default_user, is_open=False
|
||||
)
|
||||
assert updated_agent2.last_accessed_at > prev_time, "update_file_agent_by_name should update timestamp"
|
||||
|
||||
time.sleep(1.1)
|
||||
prev_time = updated_agent2.last_accessed_at
|
||||
|
||||
# Test mark_access updates timestamp
|
||||
await server.file_agent_manager.mark_access(agent_id=sarah_agent.id, file_id=file.id, actor=default_user)
|
||||
|
||||
final_agent = await server.file_agent_manager.get_file_agent_by_id(agent_id=sarah_agent.id, file_id=file.id, actor=default_user)
|
||||
assert final_agent.last_accessed_at > prev_time, "mark_access should update timestamp"
|
||||
|
||||
Reference in New Issue
Block a user