From 0a9ff9d0d58c334fd829adff8398d44e018939f1 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Mon, 23 Jun 2025 10:49:41 -0700 Subject: [PATCH] feat: Add auto-closing files via LRU metric (#2881) --- letta/constants.py | 3 + letta/server/server.py | 24 +- letta/services/files_agents_manager.py | 203 ++++++++++-- .../tool_executor/files_tool_executor.py | 29 +- tests/test_managers.py | 294 +++++++++++++++++- 5 files changed, 508 insertions(+), 45 deletions(-) diff --git a/letta/constants.py b/letta/constants.py index 3c5cb095..44bf420d 100644 --- a/letta/constants.py +++ b/letta/constants.py @@ -338,3 +338,6 @@ REDIS_INCLUDE = "include" REDIS_EXCLUDE = "exclude" REDIS_SET_DEFAULT_VAL = "None" REDIS_DEFAULT_CACHE_PREFIX = "letta_cache" + +# TODO: This is temporary, eventually use token-based eviction +MAX_FILES_OPEN = 5 diff --git a/letta/server/server.py b/letta/server/server.py index 242573bc..89c39892 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -1370,13 +1370,17 @@ class SyncServer(Server): ) await self.agent_manager.delete_agent_async(agent_id=sleeptime_agent_state.id, actor=actor) - async def _upsert_file_to_agent(self, agent_id: str, text: str, file_id: str, file_name: str, actor: User) -> None: + async def _upsert_file_to_agent(self, agent_id: str, text: str, file_id: str, file_name: str, actor: User) -> List[str]: """ Internal method to create or update a file <-> agent association + + Returns: + List of file names that were closed due to LRU eviction """ - await self.file_agent_manager.attach_file( + file_agent, closed_files = await self.file_agent_manager.attach_file( agent_id=agent_id, file_id=file_id, file_name=file_name, actor=actor, visible_content=text ) + return closed_files async def _remove_file_from_agent(self, agent_id: str, file_id: str, actor: User) -> None: """ @@ -1407,7 +1411,14 @@ class SyncServer(Server): logger.info(f"Inserting document into context window for source: {source_id}") logger.info(f"Attached agents: {[a.id for a in agent_states]}") - await asyncio.gather(*(self._upsert_file_to_agent(agent_state.id, text, file_id, file_name, actor) for agent_state in agent_states)) + # Collect any files that were closed due to LRU eviction during bulk attach + all_closed_files = await asyncio.gather( + *(self._upsert_file_to_agent(agent_state.id, text, file_id, file_name, actor) for agent_state in agent_states) + ) + # Flatten and log if any files were closed + closed_files = [file for closed_list in all_closed_files for file in closed_list] + if closed_files: + logger.info(f"LRU eviction closed {len(closed_files)} files during bulk attach: {closed_files}") return agent_states @@ -1423,12 +1434,17 @@ class SyncServer(Server): if len(texts) != len(file_ids): raise ValueError(f"Mismatch between number of texts ({len(texts)}) and file ids ({len(file_ids)})") - await asyncio.gather( + # Collect any files that were closed due to LRU eviction during bulk insert + all_closed_files = await asyncio.gather( *( self._upsert_file_to_agent(agent_state.id, text, file_id, file_name, actor) for text, file_id, file_name in zip(texts, file_ids, file_names) ) ) + # Flatten and log if any files were closed + closed_files = [file for closed_list in all_closed_files for file in closed_list] + if closed_files: + logger.info(f"LRU eviction closed {len(closed_files)} files during bulk insert: {closed_files}") async def remove_file_from_context_windows(self, source_id: str, file_id: str, actor: User) -> None: """ diff --git a/letta/services/files_agents_manager.py b/letta/services/files_agents_manager.py index a9bb7a6d..5ae19a1b 100644 --- a/letta/services/files_agents_manager.py +++ b/letta/services/files_agents_manager.py @@ -3,6 +3,7 @@ from typing import List, Optional from sqlalchemy import and_, func, select, update +from letta.constants import MAX_FILES_OPEN from letta.orm.errors import NoResultFound from letta.orm.files_agents import FileAgent as FileAgentModel from letta.otel.tracing import trace_method @@ -27,51 +28,66 @@ class FileAgentManager: actor: PydanticUser, is_open: bool = True, visible_content: Optional[str] = None, - ) -> PydanticFileAgent: + ) -> tuple[PydanticFileAgent, List[str]]: """ - Idempotently attach *file_id* to *agent_id*. + Idempotently attach *file_id* to *agent_id* with LRU enforcement. • If the row already exists → update `is_open`, `visible_content` and always refresh `last_accessed_at`. • Otherwise create a brand-new association. + • If is_open=True, enforces MAX_FILES_OPEN using LRU eviction. + + Returns: + Tuple of (file_agent, closed_file_names) """ - async with db_registry.async_session() as session: - query = select(FileAgentModel).where( - and_( - FileAgentModel.agent_id == agent_id, - FileAgentModel.file_id == file_id, - FileAgentModel.file_name == file_name, - FileAgentModel.organization_id == actor.organization_id, + if is_open: + # Use the efficient LRU + open method + closed_files, was_already_open = await self.enforce_max_open_files_and_open( + agent_id=agent_id, file_id=file_id, file_name=file_name, actor=actor, visible_content=visible_content or "" + ) + + # Get the updated file agent to return + file_agent = await self.get_file_agent_by_id(agent_id=agent_id, file_id=file_id, actor=actor) + return file_agent, closed_files + else: + # Original logic for is_open=False + async with db_registry.async_session() as session: + query = select(FileAgentModel).where( + and_( + FileAgentModel.agent_id == agent_id, + FileAgentModel.file_id == file_id, + FileAgentModel.file_name == file_name, + FileAgentModel.organization_id == actor.organization_id, + ) ) - ) - existing = await session.scalar(query) + existing = await session.scalar(query) - now_ts = datetime.now(timezone.utc) + now_ts = datetime.now(timezone.utc) - if existing: - # update only the fields that actually changed - if existing.is_open != is_open: - existing.is_open = is_open + if existing: + # update only the fields that actually changed + if existing.is_open != is_open: + existing.is_open = is_open - if visible_content is not None and existing.visible_content != visible_content: - existing.visible_content = visible_content + if visible_content is not None and existing.visible_content != visible_content: + existing.visible_content = visible_content - existing.last_accessed_at = now_ts + existing.last_accessed_at = now_ts - await existing.update_async(session, actor=actor) - return existing.to_pydantic() + await existing.update_async(session, actor=actor) + return existing.to_pydantic(), [] - assoc = FileAgentModel( - agent_id=agent_id, - file_id=file_id, - file_name=file_name, - organization_id=actor.organization_id, - is_open=is_open, - visible_content=visible_content, - last_accessed_at=now_ts, - ) - await assoc.create_async(session, actor=actor) - return assoc.to_pydantic() + assoc = FileAgentModel( + agent_id=agent_id, + file_id=file_id, + file_name=file_name, + organization_id=actor.organization_id, + is_open=is_open, + visible_content=visible_content, + last_accessed_at=now_ts, + ) + await assoc.create_async(session, actor=actor) + return assoc.to_pydantic(), [] @enforce_types @trace_method @@ -246,6 +262,129 @@ class FileAgentManager: await session.execute(stmt) await session.commit() + @enforce_types + @trace_method + async def mark_access_bulk(self, *, agent_id: str, file_names: List[str], actor: PydanticUser) -> None: + """Update `last_accessed_at = now()` for multiple files by name without loading rows.""" + if not file_names: + return + + async with db_registry.async_session() as session: + stmt = ( + update(FileAgentModel) + .where( + FileAgentModel.agent_id == agent_id, + FileAgentModel.file_name.in_(file_names), + FileAgentModel.organization_id == actor.organization_id, + ) + .values(last_accessed_at=func.now()) + ) + await session.execute(stmt) + await session.commit() + + @enforce_types + @trace_method + async def enforce_max_open_files_and_open( + self, *, agent_id: str, file_id: str, file_name: str, actor: PydanticUser, visible_content: str + ) -> tuple[List[str], bool]: + """ + Efficiently handle LRU eviction and file opening in a single transaction. + + Args: + agent_id: ID of the agent + file_id: ID of the file to open + file_name: Name of the file to open + actor: User performing the action + visible_content: Content to set for the opened file + + Returns: + Tuple of (closed_file_names, file_was_already_open) + """ + async with db_registry.async_session() as session: + # Single query to get ALL open files for this agent, ordered by last_accessed_at (oldest first) + open_files_query = ( + select(FileAgentModel) + .where( + and_( + FileAgentModel.agent_id == agent_id, + FileAgentModel.organization_id == actor.organization_id, + FileAgentModel.is_open.is_(True), + ) + ) + .order_by(FileAgentModel.last_accessed_at.asc()) # Oldest first for LRU + ) + + all_open_files = (await session.execute(open_files_query)).scalars().all() + + # Check if the target file exists (open or closed) + target_file_query = select(FileAgentModel).where( + and_( + FileAgentModel.agent_id == agent_id, + FileAgentModel.organization_id == actor.organization_id, + FileAgentModel.file_name == file_name, + ) + ) + file_to_open = await session.scalar(target_file_query) + + # Separate the file we're opening from others (only if it's currently open) + other_open_files = [] + for file_agent in all_open_files: + if file_agent.file_name != file_name: + other_open_files.append(file_agent) + + file_was_already_open = file_to_open is not None and file_to_open.is_open + + # Calculate how many files need to be closed + current_other_count = len(other_open_files) + target_other_count = MAX_FILES_OPEN - 1 # Reserve 1 slot for file we're opening + + closed_file_names = [] + if current_other_count > target_other_count: + files_to_close_count = current_other_count - target_other_count + files_to_close = other_open_files[:files_to_close_count] # Take oldest + + # Bulk close files using a single UPDATE query + file_ids_to_close = [f.file_id for f in files_to_close] + closed_file_names = [f.file_name for f in files_to_close] + + if file_ids_to_close: + close_stmt = ( + update(FileAgentModel) + .where( + and_( + FileAgentModel.agent_id == agent_id, + FileAgentModel.file_id.in_(file_ids_to_close), + FileAgentModel.organization_id == actor.organization_id, + ) + ) + .values(is_open=False, visible_content=None) + ) + await session.execute(close_stmt) + + # Open the target file (update or create) + now_ts = datetime.now(timezone.utc) + + if file_to_open: + # Update existing file + file_to_open.is_open = True + file_to_open.visible_content = visible_content + file_to_open.last_accessed_at = now_ts + await file_to_open.update_async(session, actor=actor) + else: + # Create new file association + new_file_agent = FileAgentModel( + agent_id=agent_id, + file_id=file_id, + file_name=file_name, + organization_id=actor.organization_id, + is_open=True, + visible_content=visible_content, + last_accessed_at=now_ts, + ) + await new_file_agent.create_async(session, actor=actor) + + return closed_file_names, file_was_already_open + async def _get_association_by_file_id(self, session, agent_id: str, file_id: str, actor: PydanticUser) -> FileAgentModel: q = select(FileAgentModel).where( and_( diff --git a/letta/services/tool_executor/files_tool_executor.py b/letta/services/tool_executor/files_tool_executor.py index f9186648..6f4a67df 100644 --- a/letta/services/tool_executor/files_tool_executor.py +++ b/letta/services/tool_executor/files_tool_executor.py @@ -127,10 +127,18 @@ class LettaFileToolExecutor(ToolExecutor): content_lines = LineChunker().chunk_text(text=file.content, file_metadata=file, start=start, end=end) visible_content = "\n".join(content_lines) - await self.files_agents_manager.update_file_agent_by_id( - agent_id=agent_state.id, file_id=file_id, actor=self.actor, is_open=True, visible_content=visible_content + # Efficiently handle LRU eviction and file opening in a single transaction + closed_files, was_already_open = await self.files_agents_manager.enforce_max_open_files_and_open( + agent_id=agent_state.id, file_id=file_id, file_name=file_name, actor=self.actor, visible_content=visible_content ) - return f"Successfully opened file {file_name}, lines {start} to {end} are now visible in memory block <{file_name}>" + + success_msg = f"Successfully opened file {file_name}, lines {start} to {end} are now visible in memory block <{file_name}>" + if closed_files: + success_msg += ( + f"\nNote: Closed {len(closed_files)} least recently used file(s) due to open file limit: {', '.join(closed_files)}" + ) + + return success_msg @trace_method async def close_file(self, agent_state: AgentState, file_name: str) -> str: @@ -256,10 +264,11 @@ class LettaFileToolExecutor(ToolExecutor): total_content_size = 0 files_processed = 0 files_skipped = 0 + files_with_matches = set() # Track files that had matches for LRU policy # Use asyncio timeout to prevent hanging async def _search_files(): - nonlocal results, total_matches, total_content_size, files_processed, files_skipped + nonlocal results, total_matches, total_content_size, files_processed, files_skipped, files_with_matches for file_agent in file_agents: # Load file content @@ -336,6 +345,8 @@ class LettaFileToolExecutor(ToolExecutor): continue if pattern_regex.search(line_content): + # Mark this file as having matches for LRU tracking + files_with_matches.add(file.file_name) context = self._get_context_lines(formatted_lines, match_line_num=line_num, context_lines=context_lines or 0) # Format the match result @@ -353,6 +364,10 @@ class LettaFileToolExecutor(ToolExecutor): # Execute with timeout await asyncio.wait_for(_search_files(), timeout=self.GREP_TIMEOUT_SECONDS) + # Mark access for files that had matches + if files_with_matches: + await self.files_agents_manager.mark_access_bulk(agent_id=agent_state.id, file_names=list(files_with_matches), actor=self.actor) + # Format final results if not results or total_matches == 0: summary = f"No matches found for pattern: '{pattern}'" @@ -443,6 +458,12 @@ class LettaFileToolExecutor(ToolExecutor): passage_content = "\n".join(formatted_lines) results.append(f"{passage_header}\n{passage_content}") + # Mark access for files that had matches + if files_with_passages: + matched_file_names = [name for name in files_with_passages.keys() if name != "Unknown File"] + if matched_file_names: + await self.files_agents_manager.mark_access_bulk(agent_id=agent_state.id, file_names=matched_file_names, actor=self.actor) + # Create summary header file_count = len(files_with_passages) summary = f"Found {total_passages} semantic matches in {file_count} file{'s' if file_count != 1 else ''} for query: '{query}'" diff --git a/tests/test_managers.py b/tests/test_managers.py index 411f18ea..1fdd9015 100644 --- a/tests/test_managers.py +++ b/tests/test_managers.py @@ -668,7 +668,7 @@ def event_loop(request): @pytest.fixture async def file_attachment(server, default_user, sarah_agent, default_file): - assoc = await server.file_agent_manager.attach_file( + assoc, closed_files = await server.file_agent_manager.attach_file( agent_id=sarah_agent.id, file_id=default_file.id, file_name=default_file.file_name, @@ -736,7 +736,7 @@ async def test_get_context_window_basic( created_agent, create_agent_request = comprehensive_test_agent_fixture # Attach a file - assoc = await server.file_agent_manager.attach_file( + assoc, closed_files = await server.file_agent_manager.attach_file( agent_id=created_agent.id, file_id=default_file.id, file_name=default_file.file_name, @@ -6798,7 +6798,7 @@ async def test_create_mcp_server(server, default_user, event_loop): @pytest.mark.asyncio async def test_attach_creates_association(server, default_user, sarah_agent, default_file): - assoc = await server.file_agent_manager.attach_file( + assoc, closed_files = await server.file_agent_manager.attach_file( agent_id=sarah_agent.id, file_id=default_file.id, file_name=default_file.file_name, @@ -6820,7 +6820,7 @@ async def test_attach_creates_association(server, default_user, sarah_agent, def @pytest.mark.asyncio async def test_attach_is_idempotent(server, default_user, sarah_agent, default_file): - a1 = await server.file_agent_manager.attach_file( + a1, closed_files = await server.file_agent_manager.attach_file( agent_id=sarah_agent.id, file_id=default_file.id, file_name=default_file.file_name, @@ -6829,7 +6829,7 @@ async def test_attach_is_idempotent(server, default_user, sarah_agent, default_f ) # second attach with different params - a2 = await server.file_agent_manager.attach_file( + a2, closed_files = await server.file_agent_manager.attach_file( agent_id=sarah_agent.id, file_id=default_file.id, file_name=default_file.file_name, @@ -6971,3 +6971,287 @@ async def test_org_scoping( # other org should see nothing files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=other_user_different_org) assert files == [] + + +# ====================================================================================================================== +# LRU File Management Tests +# ====================================================================================================================== + + +@pytest.mark.asyncio +async def test_mark_access_bulk(server, default_user, sarah_agent, default_source): + """Test that mark_access_bulk updates last_accessed_at for multiple files.""" + import time + + # Create multiple files and attach them + files = [] + for i in range(3): + file_metadata = PydanticFileMetadata( + file_name=f"test_file_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"test content {i}") + files.append(file) + + # Attach all files (they'll be open by default) + attached_files = [] + for file in files: + file_agent, closed_files = await server.file_agent_manager.attach_file( + agent_id=sarah_agent.id, + file_id=file.id, + file_name=file.file_name, + actor=default_user, + visible_content=f"content for {file.file_name}", + ) + attached_files.append(file_agent) + + # Get initial timestamps + initial_times = {} + for file_agent in attached_files: + fa = await server.file_agent_manager.get_file_agent_by_id(agent_id=sarah_agent.id, file_id=file_agent.file_id, actor=default_user) + initial_times[fa.file_name] = fa.last_accessed_at + + # Wait a moment to ensure timestamp difference + time.sleep(1.1) + + # Use mark_access_bulk on subset of files + file_names_to_mark = [files[0].file_name, files[2].file_name] + await server.file_agent_manager.mark_access_bulk(agent_id=sarah_agent.id, file_names=file_names_to_mark, actor=default_user) + + # Check that only marked files have updated timestamps + for i, file in enumerate(files): + fa = await server.file_agent_manager.get_file_agent_by_id(agent_id=sarah_agent.id, file_id=file.id, actor=default_user) + + if file.file_name in file_names_to_mark: + assert fa.last_accessed_at > initial_times[file.file_name], f"File {file.file_name} should have updated timestamp" + else: + assert fa.last_accessed_at == initial_times[file.file_name], f"File {file.file_name} should not have updated timestamp" + + +@pytest.mark.asyncio +async def test_lru_eviction_on_attach(server, default_user, sarah_agent, default_source): + """Test that attaching files beyond MAX_FILES_OPEN triggers LRU eviction.""" + import time + + from letta.constants import MAX_FILES_OPEN + + # Create more files than the limit + files = [] + for i in range(MAX_FILES_OPEN + 2): # 7 files for MAX_FILES_OPEN=5 + file_metadata = PydanticFileMetadata( + file_name=f"lru_test_file_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"test content {i}") + files.append(file) + + # Attach files one by one with small delays to ensure different timestamps + attached_files = [] + all_closed_files = [] + + for i, file in enumerate(files): + if i > 0: + time.sleep(0.1) # Small delay to ensure different timestamps + + file_agent, closed_files = await server.file_agent_manager.attach_file( + agent_id=sarah_agent.id, + file_id=file.id, + file_name=file.file_name, + actor=default_user, + visible_content=f"content for {file.file_name}", + ) + attached_files.append(file_agent) + all_closed_files.extend(closed_files) + + # Check that we never exceed MAX_FILES_OPEN + open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files) <= MAX_FILES_OPEN, f"Should never exceed {MAX_FILES_OPEN} open files" + + # Should have closed exactly 2 files (7 - 5 = 2) + assert len(all_closed_files) == 2, f"Should have closed 2 files, but closed: {all_closed_files}" + + # Check that the oldest files were closed (first 2 files attached) + expected_closed = [files[0].file_name, files[1].file_name] + assert set(all_closed_files) == set(expected_closed), f"Wrong files closed. Expected {expected_closed}, got {all_closed_files}" + + # Check that exactly MAX_FILES_OPEN files are open + open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files) == MAX_FILES_OPEN + + # Check that the most recently attached files are still open + open_file_names = {f.file_name for f in open_files} + expected_open = {files[i].file_name for i in range(2, MAX_FILES_OPEN + 2)} # files 2-6 + assert open_file_names == expected_open + + +@pytest.mark.asyncio +async def test_lru_eviction_on_open_file(server, default_user, sarah_agent, default_source): + """Test that opening a file beyond MAX_FILES_OPEN triggers LRU eviction.""" + import time + + from letta.constants import MAX_FILES_OPEN + + # Create files equal to the limit + files = [] + for i in range(MAX_FILES_OPEN + 1): # 6 files for MAX_FILES_OPEN=5 + file_metadata = PydanticFileMetadata( + file_name=f"open_test_file_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"test content {i}") + files.append(file) + + # Attach first MAX_FILES_OPEN files + for i in range(MAX_FILES_OPEN): + time.sleep(0.1) # Small delay for different timestamps + await server.file_agent_manager.attach_file( + agent_id=sarah_agent.id, + file_id=files[i].id, + file_name=files[i].file_name, + actor=default_user, + visible_content=f"content for {files[i].file_name}", + ) + + # Attach the last file as closed + await server.file_agent_manager.attach_file( + agent_id=sarah_agent.id, + file_id=files[-1].id, + file_name=files[-1].file_name, + actor=default_user, + is_open=False, + visible_content=f"content for {files[-1].file_name}", + ) + + # All files should be attached but only MAX_FILES_OPEN should be open + all_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user) + open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(all_files) == MAX_FILES_OPEN + 1 + assert len(open_files) == MAX_FILES_OPEN + + # Wait a moment + time.sleep(0.1) + + # Now "open" the last file using the efficient method + closed_files, was_already_open = await server.file_agent_manager.enforce_max_open_files_and_open( + agent_id=sarah_agent.id, file_id=files[-1].id, file_name=files[-1].file_name, actor=default_user, visible_content="updated content" + ) + + # Should have closed 1 file (the oldest one) + assert len(closed_files) == 1, f"Should have closed 1 file, got: {closed_files}" + assert closed_files[0] == files[0].file_name, f"Should have closed oldest file {files[0].file_name}" + + # Check that exactly MAX_FILES_OPEN files are still open + open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files) == MAX_FILES_OPEN + + # Check that the newly opened file is open and the oldest is closed + last_file_agent = await server.file_agent_manager.get_file_agent_by_id( + agent_id=sarah_agent.id, file_id=files[-1].id, actor=default_user + ) + first_file_agent = await server.file_agent_manager.get_file_agent_by_id( + agent_id=sarah_agent.id, file_id=files[0].id, actor=default_user + ) + + assert last_file_agent.is_open is True, "Last file should be open" + assert first_file_agent.is_open is False, "First file should be closed" + + +@pytest.mark.asyncio +async def test_lru_no_eviction_when_reopening_same_file(server, default_user, sarah_agent, default_source): + """Test that reopening an already open file doesn't trigger unnecessary eviction.""" + import time + + from letta.constants import MAX_FILES_OPEN + + # Create files equal to the limit + files = [] + for i in range(MAX_FILES_OPEN): + file_metadata = PydanticFileMetadata( + file_name=f"reopen_test_file_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"test content {i}") + files.append(file) + + # Attach all files (they'll be open) + for i, file in enumerate(files): + time.sleep(0.1) # Small delay for different timestamps + await server.file_agent_manager.attach_file( + agent_id=sarah_agent.id, + file_id=file.id, + file_name=file.file_name, + actor=default_user, + visible_content=f"content for {file.file_name}", + ) + + # All files should be open + open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files) == MAX_FILES_OPEN + initial_open_names = {f.file_name for f in open_files} + + # Wait a moment + time.sleep(0.1) + + # "Reopen" the last file (which is already open) + closed_files, was_already_open = await server.file_agent_manager.enforce_max_open_files_and_open( + agent_id=sarah_agent.id, file_id=files[-1].id, file_name=files[-1].file_name, actor=default_user, visible_content="updated content" + ) + + # Should not have closed any files since we're within the limit + assert len(closed_files) == 0, f"Should not have closed any files when reopening, got: {closed_files}" + assert was_already_open is True, "File should have been detected as already open" + + # All the same files should still be open + open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files) == MAX_FILES_OPEN + final_open_names = {f.file_name for f in open_files} + assert initial_open_names == final_open_names, "Same files should remain open" + + +@pytest.mark.asyncio +async def test_last_accessed_at_updates_correctly(server, default_user, sarah_agent, default_source): + """Test that last_accessed_at is updated in the correct scenarios.""" + import time + + # Create and attach a file + file_metadata = PydanticFileMetadata( + file_name="timestamp_test.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text="test content") + + file_agent, closed_files = await server.file_agent_manager.attach_file( + agent_id=sarah_agent.id, file_id=file.id, file_name=file.file_name, actor=default_user, visible_content="initial content" + ) + + initial_time = file_agent.last_accessed_at + time.sleep(1.1) + + # Test update_file_agent_by_id updates timestamp + updated_agent = await server.file_agent_manager.update_file_agent_by_id( + agent_id=sarah_agent.id, file_id=file.id, actor=default_user, visible_content="updated content" + ) + assert updated_agent.last_accessed_at > initial_time, "update_file_agent_by_id should update timestamp" + + time.sleep(1.1) + prev_time = updated_agent.last_accessed_at + + # Test update_file_agent_by_name updates timestamp + updated_agent2 = await server.file_agent_manager.update_file_agent_by_name( + agent_id=sarah_agent.id, file_name=file.file_name, actor=default_user, is_open=False + ) + assert updated_agent2.last_accessed_at > prev_time, "update_file_agent_by_name should update timestamp" + + time.sleep(1.1) + prev_time = updated_agent2.last_accessed_at + + # Test mark_access updates timestamp + await server.file_agent_manager.mark_access(agent_id=sarah_agent.id, file_id=file.id, actor=default_user) + + final_agent = await server.file_agent_manager.get_file_agent_by_id(agent_id=sarah_agent.id, file_id=file.id, actor=default_user) + assert final_agent.last_accessed_at > prev_time, "mark_access should update timestamp"