fix: Fix constraints and also implement bulk attach (#3107)

This commit is contained in:
Matthew Zhou
2025-06-30 14:27:57 -07:00
committed by GitHub
parent 3aee153040
commit 5dccccec21
6 changed files with 495 additions and 45 deletions

View File

@@ -0,0 +1,45 @@
"""Fix files_agents constraints
Revision ID: 1af251a42c06
Revises: 51999513bcf1
Create Date: 2025-06-30 11:50:42.200885
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "1af251a42c06"
down_revision: Union[str, None] = "51999513bcf1"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_files_agents_agent_file_name", table_name="files_agents")
op.drop_index("ix_files_agents_file_id_agent_id", table_name="files_agents")
op.drop_constraint("uq_files_agents_agent_file_name", "files_agents", type_="unique")
op.drop_constraint("uq_files_agents_file_agent", "files_agents", type_="unique")
op.create_index("ix_agent_filename", "files_agents", ["agent_id", "file_name"], unique=False)
op.create_index("ix_file_agent", "files_agents", ["file_id", "agent_id"], unique=False)
op.create_unique_constraint("uq_agent_filename", "files_agents", ["agent_id", "file_name"])
op.create_unique_constraint("uq_file_agent", "files_agents", ["file_id", "agent_id"])
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint("uq_file_agent", "files_agents", type_="unique")
op.drop_constraint("uq_agent_filename", "files_agents", type_="unique")
op.drop_index("ix_file_agent", table_name="files_agents")
op.drop_index("ix_agent_filename", table_name="files_agents")
op.create_unique_constraint("uq_files_agents_file_agent", "files_agents", ["file_id", "agent_id"], postgresql_nulls_not_distinct=False)
op.create_unique_constraint(
"uq_files_agents_agent_file_name", "files_agents", ["agent_id", "file_name"], postgresql_nulls_not_distinct=False
)
op.create_index("ix_files_agents_file_id_agent_id", "files_agents", ["file_id", "agent_id"], unique=False)
op.create_index("ix_files_agents_agent_file_name", "files_agents", ["agent_id", "file_name"], unique=False)
# ### end Alembic commands ###

View File

@@ -261,6 +261,7 @@ class OpenAIClient(LLMClientBase):
"""
kwargs = await self._prepare_client_kwargs_async(llm_config)
client = AsyncOpenAI(**kwargs)
response: ChatCompletion = await client.chat.completions.create(**request_data)
return response.model_dump()

View File

@@ -16,28 +16,44 @@ if TYPE_CHECKING:
class FileAgent(SqlalchemyBase, OrganizationMixin):
"""
Join table between File and Agent.
Tracks whether a file is currently “open” for the agent and
the specific excerpt (grepped section) the agent is looking at.
"""
__tablename__ = "files_agents"
__table_args__ = (
Index("ix_files_agents_file_id_agent_id", "file_id", "agent_id"),
UniqueConstraint("file_id", "agent_id", name="uq_files_agents_file_agent"),
UniqueConstraint("agent_id", "file_name", name="uq_files_agents_agent_file_name"),
Index("ix_files_agents_agent_file_name", "agent_id", "file_name"),
# (file_id, agent_id) must be unique
UniqueConstraint("file_id", "agent_id", name="uq_file_agent"),
# (file_name, agent_id) must be unique
UniqueConstraint("agent_id", "file_name", name="uq_agent_filename"),
# helpful indexes for look-ups
Index("ix_file_agent", "file_id", "agent_id"),
Index("ix_agent_filename", "agent_id", "file_name"),
)
__pydantic_model__ = PydanticFileAgent
# TODO: We want to migrate all the ORM models to do this, so we will need to move this to the SqlalchemyBase
# TODO: Some still rely on the Pydantic object to do this
id: Mapped[str] = mapped_column(String, primary_key=True, default=lambda: f"file_agent-{uuid.uuid4()}")
file_id: Mapped[str] = mapped_column(String, ForeignKey("files.id", ondelete="CASCADE"), primary_key=True, doc="ID of the file.")
file_name: Mapped[str] = mapped_column(String, nullable=False, doc="Denormalized copy of files.file_name; unique per agent.")
agent_id: Mapped[str] = mapped_column(String, ForeignKey("agents.id", ondelete="CASCADE"), primary_key=True, doc="ID of the agent.")
# single-column surrogate PK
id: Mapped[str] = mapped_column(
String,
primary_key=True,
default=lambda: f"file_agent-{uuid.uuid4()}",
)
# not part of the PK, but NOT NULL + FK
file_id: Mapped[str] = mapped_column(
String,
ForeignKey("files.id", ondelete="CASCADE"),
nullable=False,
doc="ID of the file",
)
agent_id: Mapped[str] = mapped_column(
String,
ForeignKey("agents.id", ondelete="CASCADE"),
nullable=False,
doc="ID of the agent",
)
file_name: Mapped[str] = mapped_column(
String,
nullable=False,
doc="Denormalized copy of files.file_name; unique per agent",
)
is_open: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True, doc="True if the agent currently has the file open.")
visible_content: Mapped[Optional[str]] = mapped_column(Text, nullable=True, doc="Portion of the file the agent is focused on.")

View File

@@ -1381,26 +1381,6 @@ class SyncServer(Server):
)
await self.agent_manager.delete_agent_async(agent_id=sleeptime_agent_state.id, actor=actor)
async def _upsert_file_to_agent(self, agent_id: str, file_metadata_with_content: FileMetadata, actor: User) -> List[str]:
"""
Internal method to create or update a file <-> agent association
Returns:
List of file names that were closed due to LRU eviction
"""
# TODO: Maybe have LineChunker object be on the server level?
content_lines = LineChunker().chunk_text(file_metadata=file_metadata_with_content)
visible_content = "\n".join(content_lines)
file_agent, closed_files = await self.file_agent_manager.attach_file(
agent_id=agent_id,
file_id=file_metadata_with_content.id,
file_name=file_metadata_with_content.file_name,
actor=actor,
visible_content=visible_content,
)
return closed_files
async def _remove_file_from_agent(self, agent_id: str, file_id: str, actor: User) -> None:
"""
Internal method to remove a document block for an agent.
@@ -1430,9 +1410,23 @@ class SyncServer(Server):
logger.info(f"Inserting document into context window for source: {source_id}")
logger.info(f"Attached agents: {[a.id for a in agent_states]}")
# Collect any files that were closed due to LRU eviction during bulk attach
# Generate visible content for the file
line_chunker = LineChunker()
content_lines = line_chunker.chunk_text(file_metadata=file_metadata_with_content)
visible_content = "\n".join(content_lines)
visible_content_map = {file_metadata_with_content.file_name: visible_content}
# Attach file to each agent using bulk method (one file per agent, but atomic per agent)
all_closed_files = await asyncio.gather(
*(self._upsert_file_to_agent(agent_state.id, file_metadata_with_content, actor) for agent_state in agent_states)
*(
self.file_agent_manager.attach_files_bulk(
agent_id=agent_state.id,
files_metadata=[file_metadata_with_content],
visible_content_map=visible_content_map,
actor=actor,
)
for agent_state in agent_states
)
)
# Flatten and log if any files were closed
closed_files = [file for closed_list in all_closed_files for file in closed_list]
@@ -1448,14 +1442,23 @@ class SyncServer(Server):
Insert the uploaded documents into the context window of an agent
attached to the given source.
"""
logger.info(f"Inserting documents into context window for agent_state: {agent_state.id}")
logger.info(f"Inserting {len(file_metadata_with_content)} documents into context window for agent_state: {agent_state.id}")
# Collect any files that were closed due to LRU eviction during bulk insert
all_closed_files = await asyncio.gather(
*(self._upsert_file_to_agent(agent_state.id, file_metadata, actor) for file_metadata in file_metadata_with_content)
# Generate visible content for each file
line_chunker = LineChunker()
visible_content_map = {}
for file_metadata in file_metadata_with_content:
content_lines = line_chunker.chunk_text(file_metadata=file_metadata)
visible_content_map[file_metadata.file_name] = "\n".join(content_lines)
# Use bulk attach to avoid race conditions and duplicate LRU eviction decisions
closed_files = await self.file_agent_manager.attach_files_bulk(
agent_id=agent_state.id,
files_metadata=file_metadata_with_content,
visible_content_map=visible_content_map,
actor=actor,
)
# Flatten and log if any files were closed
closed_files = [file for closed_list in all_closed_files for file in closed_list]
if closed_files:
logger.info(f"LRU eviction closed {len(closed_files)} files during bulk insert: {closed_files}")

View File

@@ -4,15 +4,19 @@ from typing import List, Optional
from sqlalchemy import and_, func, select, update
from letta.constants import MAX_FILES_OPEN
from letta.log import get_logger
from letta.orm.errors import NoResultFound
from letta.orm.files_agents import FileAgent as FileAgentModel
from letta.otel.tracing import trace_method
from letta.schemas.block import Block as PydanticBlock
from letta.schemas.file import FileAgent as PydanticFileAgent
from letta.schemas.file import FileMetadata
from letta.schemas.user import User as PydanticUser
from letta.server.db import db_registry
from letta.utils import enforce_types
logger = get_logger(__name__)
class FileAgentManager:
"""High-level helpers for CRUD / listing on the `files_agents` join table."""
@@ -423,6 +427,117 @@ class FileAgentManager:
return closed_file_names, file_was_already_open
@enforce_types
@trace_method
async def attach_files_bulk(
self,
*,
agent_id: str,
files_metadata: list[FileMetadata],
visible_content_map: Optional[dict[str, str]] = None,
actor: PydanticUser,
) -> list[str]:
"""Atomically attach many files, applying an LRU cap with one commit."""
if not files_metadata:
return []
# TODO: This is not strictly necessary, as the file_metadata should never be duped
# TODO: But we have this as a protection, check logs for details
# dedupe while preserving caller order
seen: set[str] = set()
ordered_unique: list[FileMetadata] = []
for m in files_metadata:
if m.file_name not in seen:
ordered_unique.append(m)
seen.add(m.file_name)
if (dup_cnt := len(files_metadata) - len(ordered_unique)) > 0:
logger.warning(
"attach_files_bulk: removed %d duplicate file(s) for agent %s",
dup_cnt,
agent_id,
)
now = datetime.now(timezone.utc)
vc_for = visible_content_map or {}
async with db_registry.async_session() as session:
# fetch existing assoc rows for requested names
existing_q = select(FileAgentModel).where(
FileAgentModel.agent_id == agent_id,
FileAgentModel.organization_id == actor.organization_id,
FileAgentModel.file_name.in_(seen),
)
existing_rows = (await session.execute(existing_q)).scalars().all()
existing_by_name = {r.file_name: r for r in existing_rows}
# snapshot current OPEN rows (oldest first)
open_q = (
select(FileAgentModel)
.where(
FileAgentModel.agent_id == agent_id,
FileAgentModel.organization_id == actor.organization_id,
FileAgentModel.is_open.is_(True),
)
.order_by(FileAgentModel.last_accessed_at.asc())
)
currently_open = (await session.execute(open_q)).scalars().all()
new_names = [m.file_name for m in ordered_unique]
new_names_set = set(new_names)
still_open_names = [r.file_name for r in currently_open if r.file_name not in new_names_set]
# decide final open set
if len(new_names) >= MAX_FILES_OPEN:
final_open = new_names[:MAX_FILES_OPEN]
else:
room_for_old = MAX_FILES_OPEN - len(new_names)
final_open = new_names + still_open_names[-room_for_old:]
final_open_set = set(final_open)
closed_file_names = [r.file_name for r in currently_open if r.file_name not in final_open_set]
# Add new files that won't be opened due to MAX_FILES_OPEN limit
if len(new_names) >= MAX_FILES_OPEN:
closed_file_names.extend(new_names[MAX_FILES_OPEN:])
evicted_ids = [r.file_id for r in currently_open if r.file_name in closed_file_names]
# upsert requested files
for meta in ordered_unique:
is_now_open = meta.file_name in final_open_set
vc = vc_for.get(meta.file_name, "") if is_now_open else None
if row := existing_by_name.get(meta.file_name):
row.is_open = is_now_open
row.visible_content = vc
row.last_accessed_at = now
session.add(row) # already present, but safe
else:
session.add(
FileAgentModel(
agent_id=agent_id,
file_id=meta.id,
file_name=meta.file_name,
organization_id=actor.organization_id,
is_open=is_now_open,
visible_content=vc,
last_accessed_at=now,
)
)
# bulk-close evicted rows
if evicted_ids:
await session.execute(
update(FileAgentModel)
.where(
FileAgentModel.agent_id == agent_id,
FileAgentModel.organization_id == actor.organization_id,
FileAgentModel.file_id.in_(evicted_ids),
)
.values(is_open=False, visible_content=None)
)
await session.commit()
return closed_file_names
async def _get_association_by_file_id(self, session, agent_id: str, file_id: str, actor: PydanticUser) -> FileAgentModel:
q = select(FileAgentModel).where(
and_(

View File

@@ -7415,3 +7415,273 @@ async def test_last_accessed_at_updates_correctly(server, default_user, sarah_ag
final_agent = await server.file_agent_manager.get_file_agent_by_id(agent_id=sarah_agent.id, file_id=file.id, actor=default_user)
assert final_agent.last_accessed_at > prev_time, "mark_access should update timestamp"
@pytest.mark.asyncio
async def test_attach_files_bulk_basic(server, default_user, sarah_agent, default_source):
"""Test basic functionality of attach_files_bulk method."""
# Create multiple files
files = []
for i in range(3):
file_metadata = PydanticFileMetadata(
file_name=f"bulk_test_{i}.txt",
organization_id=default_user.organization_id,
source_id=default_source.id,
)
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"content {i}")
files.append(file)
# Create visible content map
visible_content_map = {f"bulk_test_{i}.txt": f"visible content {i}" for i in range(3)}
# Bulk attach files
closed_files = await server.file_agent_manager.attach_files_bulk(
agent_id=sarah_agent.id,
files_metadata=files,
visible_content_map=visible_content_map,
actor=default_user,
)
# Should not close any files since we're under the limit
assert closed_files == []
# Verify all files are attached and open
attached_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
assert len(attached_files) == 3
attached_file_names = {f.file_name for f in attached_files}
expected_names = {f"bulk_test_{i}.txt" for i in range(3)}
assert attached_file_names == expected_names
# Verify visible content is set correctly
for i, attached_file in enumerate(attached_files):
if attached_file.file_name == f"bulk_test_{i}.txt":
assert attached_file.visible_content == f"visible content {i}"
@pytest.mark.asyncio
async def test_attach_files_bulk_deduplication(server, default_user, sarah_agent, default_source):
"""Test that attach_files_bulk properly deduplicates files with same names."""
# Create files with same name (different IDs)
file_metadata_1 = PydanticFileMetadata(
file_name="duplicate_test.txt",
organization_id=default_user.organization_id,
source_id=default_source.id,
)
file1 = await server.file_manager.create_file(file_metadata=file_metadata_1, actor=default_user, text="content 1")
file_metadata_2 = PydanticFileMetadata(
file_name="duplicate_test.txt",
organization_id=default_user.organization_id,
source_id=default_source.id,
)
file2 = await server.file_manager.create_file(file_metadata=file_metadata_2, actor=default_user, text="content 2")
# Try to attach both files (same name, different IDs)
files_to_attach = [file1, file2]
visible_content_map = {"duplicate_test.txt": "visible content"}
# Bulk attach should deduplicate
closed_files = await server.file_agent_manager.attach_files_bulk(
agent_id=sarah_agent.id,
files_metadata=files_to_attach,
visible_content_map=visible_content_map,
actor=default_user,
)
# Should only attach one file (deduplicated)
attached_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user)
assert len(attached_files) == 1
assert attached_files[0].file_name == "duplicate_test.txt"
@pytest.mark.asyncio
async def test_attach_files_bulk_lru_eviction(server, default_user, sarah_agent, default_source):
"""Test that attach_files_bulk properly handles LRU eviction without duplicates."""
import time
from letta.constants import MAX_FILES_OPEN
# First, fill up to the max with individual files
existing_files = []
for i in range(MAX_FILES_OPEN):
file_metadata = PydanticFileMetadata(
file_name=f"existing_{i}.txt",
organization_id=default_user.organization_id,
source_id=default_source.id,
)
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"existing {i}")
existing_files.append(file)
time.sleep(0.05) # Small delay for different timestamps
await server.file_agent_manager.attach_file(
agent_id=sarah_agent.id,
file_id=file.id,
file_name=file.file_name,
actor=default_user,
visible_content=f"existing content {i}",
)
# Verify we're at the limit
open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
assert len(open_files) == MAX_FILES_OPEN
# Now bulk attach 3 new files (should trigger LRU eviction)
new_files = []
for i in range(3):
file_metadata = PydanticFileMetadata(
file_name=f"new_bulk_{i}.txt",
organization_id=default_user.organization_id,
source_id=default_source.id,
)
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"new content {i}")
new_files.append(file)
visible_content_map = {f"new_bulk_{i}.txt": f"new visible {i}" for i in range(3)}
# Bulk attach should evict oldest files
closed_files = await server.file_agent_manager.attach_files_bulk(
agent_id=sarah_agent.id,
files_metadata=new_files,
visible_content_map=visible_content_map,
actor=default_user,
)
# Should have closed exactly 3 files (oldest ones)
assert len(closed_files) == 3
# CRITICAL: Verify no duplicates in closed_files list
assert len(closed_files) == len(set(closed_files)), f"Duplicate file names in closed_files: {closed_files}"
# Verify expected files were closed (oldest 3)
expected_closed = {f"existing_{i}.txt" for i in range(3)}
actual_closed = set(closed_files)
assert actual_closed == expected_closed
# Verify we still have exactly MAX_FILES_OPEN files open
open_files_after = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
assert len(open_files_after) == MAX_FILES_OPEN
# Verify the new files are open
open_file_names = {f.file_name for f in open_files_after}
for i in range(3):
assert f"new_bulk_{i}.txt" in open_file_names
@pytest.mark.asyncio
async def test_attach_files_bulk_mixed_existing_new(server, default_user, sarah_agent, default_source):
"""Test bulk attach with mix of existing and new files."""
# Create and attach one file individually first
existing_file_metadata = PydanticFileMetadata(
file_name="existing_file.txt",
organization_id=default_user.organization_id,
source_id=default_source.id,
)
existing_file = await server.file_manager.create_file(file_metadata=existing_file_metadata, actor=default_user, text="existing")
await server.file_agent_manager.attach_file(
agent_id=sarah_agent.id,
file_id=existing_file.id,
file_name=existing_file.file_name,
actor=default_user,
visible_content="old content",
is_open=False, # Start as closed
)
# Create new files
new_files = []
for i in range(2):
file_metadata = PydanticFileMetadata(
file_name=f"new_file_{i}.txt",
organization_id=default_user.organization_id,
source_id=default_source.id,
)
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"new {i}")
new_files.append(file)
# Bulk attach: existing file + new files
files_to_attach = [existing_file] + new_files
visible_content_map = {
"existing_file.txt": "updated content",
"new_file_0.txt": "new content 0",
"new_file_1.txt": "new content 1",
}
closed_files = await server.file_agent_manager.attach_files_bulk(
agent_id=sarah_agent.id,
files_metadata=files_to_attach,
visible_content_map=visible_content_map,
actor=default_user,
)
# Should not close any files
assert closed_files == []
# Verify all files are now open
open_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
assert len(open_files) == 3
# Verify existing file was updated
existing_file_agent = await server.file_agent_manager.get_file_agent_by_file_name(
agent_id=sarah_agent.id, file_name="existing_file.txt", actor=default_user
)
assert existing_file_agent.is_open is True
assert existing_file_agent.visible_content == "updated content"
@pytest.mark.asyncio
async def test_attach_files_bulk_empty_list(server, default_user, sarah_agent):
"""Test attach_files_bulk with empty file list."""
closed_files = await server.file_agent_manager.attach_files_bulk(
agent_id=sarah_agent.id,
files_metadata=[],
visible_content_map={},
actor=default_user,
)
assert closed_files == []
# Verify no files are attached
attached_files = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user)
assert len(attached_files) == 0
@pytest.mark.asyncio
async def test_attach_files_bulk_oversized_bulk(server, default_user, sarah_agent, default_source):
"""Test bulk attach when trying to attach more files than MAX_FILES_OPEN allows."""
from letta.constants import MAX_FILES_OPEN
# Create more files than the limit allows
oversized_files = []
for i in range(MAX_FILES_OPEN + 3): # 3 more than limit
file_metadata = PydanticFileMetadata(
file_name=f"oversized_{i}.txt",
organization_id=default_user.organization_id,
source_id=default_source.id,
)
file = await server.file_manager.create_file(file_metadata=file_metadata, actor=default_user, text=f"oversized {i}")
oversized_files.append(file)
visible_content_map = {f"oversized_{i}.txt": f"oversized visible {i}" for i in range(MAX_FILES_OPEN + 3)}
# Bulk attach all files (more than limit)
closed_files = await server.file_agent_manager.attach_files_bulk(
agent_id=sarah_agent.id,
files_metadata=oversized_files,
visible_content_map=visible_content_map,
actor=default_user,
)
# Should have closed exactly 3 files (the excess)
assert len(closed_files) == 3
# CRITICAL: Verify no duplicates in closed_files list
assert len(closed_files) == len(set(closed_files)), f"Duplicate file names in closed_files: {closed_files}"
# Should have exactly MAX_FILES_OPEN files open
open_files_after = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user, is_open_only=True)
assert len(open_files_after) == MAX_FILES_OPEN
# All files should be attached (some open, some closed)
all_files_after = await server.file_agent_manager.list_files_for_agent(sarah_agent.id, actor=default_user)
assert len(all_files_after) == MAX_FILES_OPEN + 3