diff --git a/alembic/versions/1af251a42c06_fix_files_agents_constraints.py b/alembic/versions/1af251a42c06_fix_files_agents_constraints.py new file mode 100644 index 00000000..b5b40eee --- /dev/null +++ b/alembic/versions/1af251a42c06_fix_files_agents_constraints.py @@ -0,0 +1,45 @@ +"""Fix files_agents constraints + +Revision ID: 1af251a42c06 +Revises: 51999513bcf1 +Create Date: 2025-06-30 11:50:42.200885 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "1af251a42c06" +down_revision: Union[str, None] = "51999513bcf1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index("ix_files_agents_agent_file_name", table_name="files_agents") + op.drop_index("ix_files_agents_file_id_agent_id", table_name="files_agents") + op.drop_constraint("uq_files_agents_agent_file_name", "files_agents", type_="unique") + op.drop_constraint("uq_files_agents_file_agent", "files_agents", type_="unique") + op.create_index("ix_agent_filename", "files_agents", ["agent_id", "file_name"], unique=False) + op.create_index("ix_file_agent", "files_agents", ["file_id", "agent_id"], unique=False) + op.create_unique_constraint("uq_agent_filename", "files_agents", ["agent_id", "file_name"]) + op.create_unique_constraint("uq_file_agent", "files_agents", ["file_id", "agent_id"]) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint("uq_file_agent", "files_agents", type_="unique") + op.drop_constraint("uq_agent_filename", "files_agents", type_="unique") + op.drop_index("ix_file_agent", table_name="files_agents") + op.drop_index("ix_agent_filename", table_name="files_agents") + op.create_unique_constraint("uq_files_agents_file_agent", "files_agents", ["file_id", "agent_id"], postgresql_nulls_not_distinct=False) + op.create_unique_constraint( + "uq_files_agents_agent_file_name", "files_agents", ["agent_id", "file_name"], postgresql_nulls_not_distinct=False + ) + op.create_index("ix_files_agents_file_id_agent_id", "files_agents", ["file_id", "agent_id"], unique=False) + op.create_index("ix_files_agents_agent_file_name", "files_agents", ["agent_id", "file_name"], unique=False) + # ### end Alembic commands ### diff --git a/letta/llm_api/openai_client.py b/letta/llm_api/openai_client.py index 849b2851..d7c4bd43 100644 --- a/letta/llm_api/openai_client.py +++ b/letta/llm_api/openai_client.py @@ -261,6 +261,7 @@ class OpenAIClient(LLMClientBase): """ kwargs = await self._prepare_client_kwargs_async(llm_config) client = AsyncOpenAI(**kwargs) + response: ChatCompletion = await client.chat.completions.create(**request_data) return response.model_dump() diff --git a/letta/orm/files_agents.py b/letta/orm/files_agents.py index 4cb9b5c5..fcb32468 100644 --- a/letta/orm/files_agents.py +++ b/letta/orm/files_agents.py @@ -16,28 +16,44 @@ if TYPE_CHECKING: class FileAgent(SqlalchemyBase, OrganizationMixin): - """ - Join table between File and Agent. - - Tracks whether a file is currently “open” for the agent and - the specific excerpt (grepped section) the agent is looking at. - """ - __tablename__ = "files_agents" __table_args__ = ( - Index("ix_files_agents_file_id_agent_id", "file_id", "agent_id"), - UniqueConstraint("file_id", "agent_id", name="uq_files_agents_file_agent"), - UniqueConstraint("agent_id", "file_name", name="uq_files_agents_agent_file_name"), - Index("ix_files_agents_agent_file_name", "agent_id", "file_name"), + # (file_id, agent_id) must be unique + UniqueConstraint("file_id", "agent_id", name="uq_file_agent"), + # (file_name, agent_id) must be unique + UniqueConstraint("agent_id", "file_name", name="uq_agent_filename"), + # helpful indexes for look-ups + Index("ix_file_agent", "file_id", "agent_id"), + Index("ix_agent_filename", "agent_id", "file_name"), ) __pydantic_model__ = PydanticFileAgent - # TODO: We want to migrate all the ORM models to do this, so we will need to move this to the SqlalchemyBase - # TODO: Some still rely on the Pydantic object to do this - id: Mapped[str] = mapped_column(String, primary_key=True, default=lambda: f"file_agent-{uuid.uuid4()}") - file_id: Mapped[str] = mapped_column(String, ForeignKey("files.id", ondelete="CASCADE"), primary_key=True, doc="ID of the file.") - file_name: Mapped[str] = mapped_column(String, nullable=False, doc="Denormalized copy of files.file_name; unique per agent.") - agent_id: Mapped[str] = mapped_column(String, ForeignKey("agents.id", ondelete="CASCADE"), primary_key=True, doc="ID of the agent.") + # single-column surrogate PK + id: Mapped[str] = mapped_column( + String, + primary_key=True, + default=lambda: f"file_agent-{uuid.uuid4()}", + ) + + # not part of the PK, but NOT NULL + FK + file_id: Mapped[str] = mapped_column( + String, + ForeignKey("files.id", ondelete="CASCADE"), + nullable=False, + doc="ID of the file", + ) + agent_id: Mapped[str] = mapped_column( + String, + ForeignKey("agents.id", ondelete="CASCADE"), + nullable=False, + doc="ID of the agent", + ) + + file_name: Mapped[str] = mapped_column( + String, + nullable=False, + doc="Denormalized copy of files.file_name; unique per agent", + ) is_open: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True, doc="True if the agent currently has the file open.") visible_content: Mapped[Optional[str]] = mapped_column(Text, nullable=True, doc="Portion of the file the agent is focused on.") diff --git a/letta/server/server.py b/letta/server/server.py index f2ee5883..06945f90 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -1381,26 +1381,6 @@ class SyncServer(Server): ) await self.agent_manager.delete_agent_async(agent_id=sleeptime_agent_state.id, actor=actor) - async def _upsert_file_to_agent(self, agent_id: str, file_metadata_with_content: FileMetadata, actor: User) -> List[str]: - """ - Internal method to create or update a file <-> agent association - - Returns: - List of file names that were closed due to LRU eviction - """ - # TODO: Maybe have LineChunker object be on the server level? - content_lines = LineChunker().chunk_text(file_metadata=file_metadata_with_content) - visible_content = "\n".join(content_lines) - - file_agent, closed_files = await self.file_agent_manager.attach_file( - agent_id=agent_id, - file_id=file_metadata_with_content.id, - file_name=file_metadata_with_content.file_name, - actor=actor, - visible_content=visible_content, - ) - return closed_files - async def _remove_file_from_agent(self, agent_id: str, file_id: str, actor: User) -> None: """ Internal method to remove a document block for an agent. @@ -1430,9 +1410,23 @@ class SyncServer(Server): logger.info(f"Inserting document into context window for source: {source_id}") logger.info(f"Attached agents: {[a.id for a in agent_states]}") - # Collect any files that were closed due to LRU eviction during bulk attach + # Generate visible content for the file + line_chunker = LineChunker() + content_lines = line_chunker.chunk_text(file_metadata=file_metadata_with_content) + visible_content = "\n".join(content_lines) + visible_content_map = {file_metadata_with_content.file_name: visible_content} + + # Attach file to each agent using bulk method (one file per agent, but atomic per agent) all_closed_files = await asyncio.gather( - *(self._upsert_file_to_agent(agent_state.id, file_metadata_with_content, actor) for agent_state in agent_states) + *( + self.file_agent_manager.attach_files_bulk( + agent_id=agent_state.id, + files_metadata=[file_metadata_with_content], + visible_content_map=visible_content_map, + actor=actor, + ) + for agent_state in agent_states + ) ) # Flatten and log if any files were closed closed_files = [file for closed_list in all_closed_files for file in closed_list] @@ -1448,14 +1442,23 @@ class SyncServer(Server): Insert the uploaded documents into the context window of an agent attached to the given source. """ - logger.info(f"Inserting documents into context window for agent_state: {agent_state.id}") + logger.info(f"Inserting {len(file_metadata_with_content)} documents into context window for agent_state: {agent_state.id}") - # Collect any files that were closed due to LRU eviction during bulk insert - all_closed_files = await asyncio.gather( - *(self._upsert_file_to_agent(agent_state.id, file_metadata, actor) for file_metadata in file_metadata_with_content) + # Generate visible content for each file + line_chunker = LineChunker() + visible_content_map = {} + for file_metadata in file_metadata_with_content: + content_lines = line_chunker.chunk_text(file_metadata=file_metadata) + visible_content_map[file_metadata.file_name] = "\n".join(content_lines) + + # Use bulk attach to avoid race conditions and duplicate LRU eviction decisions + closed_files = await self.file_agent_manager.attach_files_bulk( + agent_id=agent_state.id, + files_metadata=file_metadata_with_content, + visible_content_map=visible_content_map, + actor=actor, ) - # Flatten and log if any files were closed - closed_files = [file for closed_list in all_closed_files for file in closed_list] + if closed_files: logger.info(f"LRU eviction closed {len(closed_files)} files during bulk insert: {closed_files}") diff --git a/letta/services/files_agents_manager.py b/letta/services/files_agents_manager.py index 6ed3e052..a4abab31 100644 --- a/letta/services/files_agents_manager.py +++ b/letta/services/files_agents_manager.py @@ -4,15 +4,19 @@ from typing import List, Optional from sqlalchemy import and_, func, select, update from letta.constants import MAX_FILES_OPEN +from letta.log import get_logger from letta.orm.errors import NoResultFound from letta.orm.files_agents import FileAgent as FileAgentModel from letta.otel.tracing import trace_method from letta.schemas.block import Block as PydanticBlock from letta.schemas.file import FileAgent as PydanticFileAgent +from letta.schemas.file import FileMetadata from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.utils import enforce_types +logger = get_logger(__name__) + class FileAgentManager: """High-level helpers for CRUD / listing on the `files_agents` join table.""" @@ -423,6 +427,117 @@ class FileAgentManager: return closed_file_names, file_was_already_open + @enforce_types + @trace_method + async def attach_files_bulk( + self, + *, + agent_id: str, + files_metadata: list[FileMetadata], + visible_content_map: Optional[dict[str, str]] = None, + actor: PydanticUser, + ) -> list[str]: + """Atomically attach many files, applying an LRU cap with one commit.""" + if not files_metadata: + return [] + + # TODO: This is not strictly necessary, as the file_metadata should never be duped + # TODO: But we have this as a protection, check logs for details + # dedupe while preserving caller order + seen: set[str] = set() + ordered_unique: list[FileMetadata] = [] + for m in files_metadata: + if m.file_name not in seen: + ordered_unique.append(m) + seen.add(m.file_name) + if (dup_cnt := len(files_metadata) - len(ordered_unique)) > 0: + logger.warning( + "attach_files_bulk: removed %d duplicate file(s) for agent %s", + dup_cnt, + agent_id, + ) + + now = datetime.now(timezone.utc) + vc_for = visible_content_map or {} + + async with db_registry.async_session() as session: + # fetch existing assoc rows for requested names + existing_q = select(FileAgentModel).where( + FileAgentModel.agent_id == agent_id, + FileAgentModel.organization_id == actor.organization_id, + FileAgentModel.file_name.in_(seen), + ) + existing_rows = (await session.execute(existing_q)).scalars().all() + existing_by_name = {r.file_name: r for r in existing_rows} + + # snapshot current OPEN rows (oldest first) + open_q = ( + select(FileAgentModel) + .where( + FileAgentModel.agent_id == agent_id, + FileAgentModel.organization_id == actor.organization_id, + FileAgentModel.is_open.is_(True), + ) + .order_by(FileAgentModel.last_accessed_at.asc()) + ) + currently_open = (await session.execute(open_q)).scalars().all() + + new_names = [m.file_name for m in ordered_unique] + new_names_set = set(new_names) + still_open_names = [r.file_name for r in currently_open if r.file_name not in new_names_set] + + # decide final open set + if len(new_names) >= MAX_FILES_OPEN: + final_open = new_names[:MAX_FILES_OPEN] + else: + room_for_old = MAX_FILES_OPEN - len(new_names) + final_open = new_names + still_open_names[-room_for_old:] + final_open_set = set(final_open) + + closed_file_names = [r.file_name for r in currently_open if r.file_name not in final_open_set] + # Add new files that won't be opened due to MAX_FILES_OPEN limit + if len(new_names) >= MAX_FILES_OPEN: + closed_file_names.extend(new_names[MAX_FILES_OPEN:]) + evicted_ids = [r.file_id for r in currently_open if r.file_name in closed_file_names] + + # upsert requested files + for meta in ordered_unique: + is_now_open = meta.file_name in final_open_set + vc = vc_for.get(meta.file_name, "") if is_now_open else None + + if row := existing_by_name.get(meta.file_name): + row.is_open = is_now_open + row.visible_content = vc + row.last_accessed_at = now + session.add(row) # already present, but safe + else: + session.add( + FileAgentModel( + agent_id=agent_id, + file_id=meta.id, + file_name=meta.file_name, + organization_id=actor.organization_id, + is_open=is_now_open, + visible_content=vc, + last_accessed_at=now, + ) + ) + + # bulk-close evicted rows + if evicted_ids: + await session.execute( + update(FileAgentModel) + .where( + FileAgentModel.agent_id == agent_id, + FileAgentModel.organization_id == actor.organization_id, + FileAgentModel.file_id.in_(evicted_ids), + ) + .values(is_open=False, visible_content=None) + ) + + await session.commit() + return closed_file_names + async def _get_association_by_file_id(self, session, agent_id: str, file_id: str, actor: PydanticUser) -> FileAgentModel: q = select(FileAgentModel).where( and_( diff --git a/tests/test_managers.py b/tests/test_managers.py index 751d514f..d574bf1b 100644 --- a/tests/test_managers.py +++ b/tests/test_managers.py @@ -7415,3 +7415,273 @@ async def test_last_accessed_at_updates_correctly(server, default_user, sarah_ag final_agent = await server.file_agent_manager.get_file_agent_by_id(agent_id=sarah_agent.id, file_id=file.id, actor=default_user) assert final_agent.last_accessed_at > prev_time, "mark_access should update timestamp" + + +@pytest.mark.asyncio +async def test_attach_files_bulk_basic(server, default_user, sarah_agent, default_source): + """Test basic functionality of attach_files_bulk method.""" + # Create multiple files + files = [] + for i in range(3): + file_metadata = PydanticFileMetadata( + file_name=f"bulk_test_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"content {i}") + files.append(file) + + # Create visible content map + visible_content_map = {f"bulk_test_{i}.txt": f"visible content {i}" for i in range(3)} + + # Bulk attach files + closed_files = await server.file_agent_manager.attach_files_bulk( + agent_id=sarah_agent.id, + files_metadata=files, + visible_content_map=visible_content_map, + actor=default_user, + ) + + # Should not close any files since we're under the limit + assert closed_files == [] + + # Verify all files are attached and open + attached_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(attached_files) == 3 + + attached_file_names = {f.file_name for f in attached_files} + expected_names = {f"bulk_test_{i}.txt" for i in range(3)} + assert attached_file_names == expected_names + + # Verify visible content is set correctly + for i, attached_file in enumerate(attached_files): + if attached_file.file_name == f"bulk_test_{i}.txt": + assert attached_file.visible_content == f"visible content {i}" + + +@pytest.mark.asyncio +async def test_attach_files_bulk_deduplication(server, default_user, sarah_agent, default_source): + """Test that attach_files_bulk properly deduplicates files with same names.""" + # Create files with same name (different IDs) + file_metadata_1 = PydanticFileMetadata( + file_name="duplicate_test.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file1 = await server.file_manager.create_file(file_metadata=file_metadata_1, actor=default_user, text="content 1") + + file_metadata_2 = PydanticFileMetadata( + file_name="duplicate_test.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file2 = await server.file_manager.create_file(file_metadata=file_metadata_2, actor=default_user, text="content 2") + + # Try to attach both files (same name, different IDs) + files_to_attach = [file1, file2] + visible_content_map = {"duplicate_test.txt": "visible content"} + + # Bulk attach should deduplicate + closed_files = await server.file_agent_manager.attach_files_bulk( + agent_id=sarah_agent.id, + files_metadata=files_to_attach, + visible_content_map=visible_content_map, + actor=default_user, + ) + + # Should only attach one file (deduplicated) + attached_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user) + assert len(attached_files) == 1 + assert attached_files[0].file_name == "duplicate_test.txt" + + +@pytest.mark.asyncio +async def test_attach_files_bulk_lru_eviction(server, default_user, sarah_agent, default_source): + """Test that attach_files_bulk properly handles LRU eviction without duplicates.""" + import time + + from letta.constants import MAX_FILES_OPEN + + # First, fill up to the max with individual files + existing_files = [] + for i in range(MAX_FILES_OPEN): + file_metadata = PydanticFileMetadata( + file_name=f"existing_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"existing {i}") + existing_files.append(file) + + time.sleep(0.05) # Small delay for different timestamps + await server.file_agent_manager.attach_file( + agent_id=sarah_agent.id, + file_id=file.id, + file_name=file.file_name, + actor=default_user, + visible_content=f"existing content {i}", + ) + + # Verify we're at the limit + open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files) == MAX_FILES_OPEN + + # Now bulk attach 3 new files (should trigger LRU eviction) + new_files = [] + for i in range(3): + file_metadata = PydanticFileMetadata( + file_name=f"new_bulk_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"new content {i}") + new_files.append(file) + + visible_content_map = {f"new_bulk_{i}.txt": f"new visible {i}" for i in range(3)} + + # Bulk attach should evict oldest files + closed_files = await server.file_agent_manager.attach_files_bulk( + agent_id=sarah_agent.id, + files_metadata=new_files, + visible_content_map=visible_content_map, + actor=default_user, + ) + + # Should have closed exactly 3 files (oldest ones) + assert len(closed_files) == 3 + + # CRITICAL: Verify no duplicates in closed_files list + assert len(closed_files) == len(set(closed_files)), f"Duplicate file names in closed_files: {closed_files}" + + # Verify expected files were closed (oldest 3) + expected_closed = {f"existing_{i}.txt" for i in range(3)} + actual_closed = set(closed_files) + assert actual_closed == expected_closed + + # Verify we still have exactly MAX_FILES_OPEN files open + open_files_after = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files_after) == MAX_FILES_OPEN + + # Verify the new files are open + open_file_names = {f.file_name for f in open_files_after} + for i in range(3): + assert f"new_bulk_{i}.txt" in open_file_names + + +@pytest.mark.asyncio +async def test_attach_files_bulk_mixed_existing_new(server, default_user, sarah_agent, default_source): + """Test bulk attach with mix of existing and new files.""" + # Create and attach one file individually first + existing_file_metadata = PydanticFileMetadata( + file_name="existing_file.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + existing_file = await server.file_manager.create_file(file_metadata=existing_file_metadata, actor=default_user, text="existing") + + await server.file_agent_manager.attach_file( + agent_id=sarah_agent.id, + file_id=existing_file.id, + file_name=existing_file.file_name, + actor=default_user, + visible_content="old content", + is_open=False, # Start as closed + ) + + # Create new files + new_files = [] + for i in range(2): + file_metadata = PydanticFileMetadata( + file_name=f"new_file_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"new {i}") + new_files.append(file) + + # Bulk attach: existing file + new files + files_to_attach = [existing_file] + new_files + visible_content_map = { + "existing_file.txt": "updated content", + "new_file_0.txt": "new content 0", + "new_file_1.txt": "new content 1", + } + + closed_files = await server.file_agent_manager.attach_files_bulk( + agent_id=sarah_agent.id, + files_metadata=files_to_attach, + visible_content_map=visible_content_map, + actor=default_user, + ) + + # Should not close any files + assert closed_files == [] + + # Verify all files are now open + open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files) == 3 + + # Verify existing file was updated + existing_file_agent = await server.file_agent_manager.get_file_agent_by_file_name( + agent_id=sarah_agent.id, file_name="existing_file.txt", actor=default_user + ) + assert existing_file_agent.is_open is True + assert existing_file_agent.visible_content == "updated content" + + +@pytest.mark.asyncio +async def test_attach_files_bulk_empty_list(server, default_user, sarah_agent): + """Test attach_files_bulk with empty file list.""" + closed_files = await server.file_agent_manager.attach_files_bulk( + agent_id=sarah_agent.id, + files_metadata=[], + visible_content_map={}, + actor=default_user, + ) + + assert closed_files == [] + + # Verify no files are attached + attached_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user) + assert len(attached_files) == 0 + + +@pytest.mark.asyncio +async def test_attach_files_bulk_oversized_bulk(server, default_user, sarah_agent, default_source): + """Test bulk attach when trying to attach more files than MAX_FILES_OPEN allows.""" + from letta.constants import MAX_FILES_OPEN + + # Create more files than the limit allows + oversized_files = [] + for i in range(MAX_FILES_OPEN + 3): # 3 more than limit + file_metadata = PydanticFileMetadata( + file_name=f"oversized_{i}.txt", + organization_id=default_user.organization_id, + source_id=default_source.id, + ) + file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"oversized {i}") + oversized_files.append(file) + + visible_content_map = {f"oversized_{i}.txt": f"oversized visible {i}" for i in range(MAX_FILES_OPEN + 3)} + + # Bulk attach all files (more than limit) + closed_files = await server.file_agent_manager.attach_files_bulk( + agent_id=sarah_agent.id, + files_metadata=oversized_files, + visible_content_map=visible_content_map, + actor=default_user, + ) + + # Should have closed exactly 3 files (the excess) + assert len(closed_files) == 3 + + # CRITICAL: Verify no duplicates in closed_files list + assert len(closed_files) == len(set(closed_files)), f"Duplicate file names in closed_files: {closed_files}" + + # Should have exactly MAX_FILES_OPEN files open + open_files_after = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True) + assert len(open_files_after) == MAX_FILES_OPEN + + # All files should be attached (some open, some closed) + all_files_after = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user) + assert len(all_files_after) == MAX_FILES_OPEN + 3