diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index b6ea04a7..f6c6e46b 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -332,7 +332,7 @@ async def attach_source( files = await server.file_manager.list_files(source_id, actor, include_content=True) if files: - await server.insert_files_into_context_window(agent_state=agent_state, file_metadata_with_content=files, actor=actor) + await server.agent_manager.insert_files_into_context_window(agent_state=agent_state, file_metadata_with_content=files, actor=actor) if agent_state.enable_sleeptime: source = await server.source_manager.get_source_by_id(source_id=source_id) diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index 47322bb4..bcd1c664 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -481,6 +481,4 @@ async def load_file_to_source_cloud( else: embedder = OpenAIEmbedder(embedding_config=embedding_config) file_processor = FileProcessor(file_parser=file_parser, embedder=embedder, actor=actor, using_pinecone=using_pinecone) - await file_processor.process( - server=server, agent_states=agent_states, source_id=source_id, content=content, file_metadata=file_metadata - ) + await file_processor.process(agent_states=agent_states, source_id=source_id, content=content, file_metadata=file_metadata) diff --git a/letta/server/server.py b/letta/server/server.py index be109f97..dcf604b0 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -42,7 +42,6 @@ from letta.schemas.embedding_config import EmbeddingConfig # openai schemas from letta.schemas.enums import JobStatus, MessageStreamStatus, ProviderCategory, ProviderType from letta.schemas.environment_variables import SandboxEnvironmentVariableCreate -from letta.schemas.file import FileMetadata from letta.schemas.group import GroupCreate, ManagerType, SleeptimeManager, VoiceSleeptimeManager from letta.schemas.job import Job, JobUpdate from letta.schemas.letta_message import LegacyLettaMessage, LettaMessage, MessageType, ToolReturnMessage @@ -79,10 +78,9 @@ from letta.server.rest_api.chat_completions_interface import ChatCompletionsStre from letta.server.rest_api.interface import StreamingServerInterface from letta.server.rest_api.utils import sse_async_generator from letta.services.agent_manager import AgentManager -from letta.services.agent_serialization_manager import AgentFileManager +from letta.services.agent_serialization_manager import AgentSerializationManager from letta.services.block_manager import BlockManager from letta.services.file_manager import FileManager -from letta.services.file_processor.chunker.line_chunker import LineChunker from letta.services.files_agents_manager import FileAgentManager from letta.services.group_manager import GroupManager from letta.services.helpers.tool_execution_helper import prepare_local_sandbox @@ -224,7 +222,18 @@ class SyncServer(Server): self.telemetry_manager = TelemetryManager() self.file_agent_manager = FileAgentManager() self.file_manager = FileManager() - self.agent_serialization_manager = AgentFileManager() + + self.agent_serialization_manager = AgentSerializationManager( + agent_manager=self.agent_manager, + tool_manager=self.tool_manager, + source_manager=self.source_manager, + block_manager=self.block_manager, + group_manager=self.group_manager, + mcp_manager=self.mcp_manager, + file_manager=self.file_manager, + file_agent_manager=self.file_agent_manager, + message_manager=self.message_manager, + ) # A resusable httpx client timeout = httpx.Timeout(connect=10.0, read=20.0, write=10.0, pool=10.0) @@ -874,7 +883,9 @@ class SyncServer(Server): if request.source_ids: for source_id in request.source_ids: files = await self.file_manager.list_files(source_id, actor, include_content=True) - await self.insert_files_into_context_window(agent_state=main_agent, file_metadata_with_content=files, actor=actor) + await self.agent_manager.insert_files_into_context_window( + agent_state=main_agent, file_metadata_with_content=files, actor=actor + ) main_agent = await self.agent_manager.refresh_file_blocks(agent_state=main_agent, actor=actor) main_agent = await self.agent_manager.attach_missing_files_tools_async(agent_state=main_agent, actor=actor) @@ -1397,76 +1408,6 @@ class SyncServer(Server): except NoResultFound: logger.info(f"File {file_id} already removed from agent {agent_id}, skipping...") - async def insert_file_into_context_windows( - self, source_id: str, file_metadata_with_content: FileMetadata, actor: User, agent_states: Optional[List[AgentState]] = None - ) -> List[AgentState]: - """ - Insert the uploaded document into the context window of all agents - attached to the given source. - """ - agent_states = agent_states or await self.source_manager.list_attached_agents(source_id=source_id, actor=actor) - - # Return early - if not agent_states: - return [] - - logger.info(f"Inserting document into context window for source: {source_id}") - logger.info(f"Attached agents: {[a.id for a in agent_states]}") - - # Generate visible content for the file - line_chunker = LineChunker() - content_lines = line_chunker.chunk_text(file_metadata=file_metadata_with_content) - visible_content = "\n".join(content_lines) - visible_content_map = {file_metadata_with_content.file_name: visible_content} - - # Attach file to each agent using bulk method (one file per agent, but atomic per agent) - all_closed_files = await asyncio.gather( - *( - self.file_agent_manager.attach_files_bulk( - agent_id=agent_state.id, - files_metadata=[file_metadata_with_content], - visible_content_map=visible_content_map, - actor=actor, - max_files_open=agent_state.max_files_open, - ) - for agent_state in agent_states - ) - ) - # Flatten and log if any files were closed - closed_files = [file for closed_list in all_closed_files for file in closed_list] - if closed_files: - logger.info(f"LRU eviction closed {len(closed_files)} files during bulk attach: {closed_files}") - - return agent_states - - async def insert_files_into_context_window( - self, agent_state: AgentState, file_metadata_with_content: List[FileMetadata], actor: User - ) -> None: - """ - Insert the uploaded documents into the context window of an agent - attached to the given source. - """ - logger.info(f"Inserting {len(file_metadata_with_content)} documents into context window for agent_state: {agent_state.id}") - - # Generate visible content for each file - line_chunker = LineChunker() - visible_content_map = {} - for file_metadata in file_metadata_with_content: - content_lines = line_chunker.chunk_text(file_metadata=file_metadata) - visible_content_map[file_metadata.file_name] = "\n".join(content_lines) - - # Use bulk attach to avoid race conditions and duplicate LRU eviction decisions - closed_files = await self.file_agent_manager.attach_files_bulk( - agent_id=agent_state.id, - files_metadata=file_metadata_with_content, - visible_content_map=visible_content_map, - actor=actor, - max_files_open=agent_state.max_files_open, - ) - - if closed_files: - logger.info(f"LRU eviction closed {len(closed_files)} files during bulk insert: {closed_files}") - async def remove_file_from_context_windows(self, source_id: str, file_id: str, actor: User) -> None: """ Remove the document from the context window of all agents diff --git a/letta/services/agent_manager.py b/letta/services/agent_manager.py index 8bef6e4a..e8a5961a 100644 --- a/letta/services/agent_manager.py +++ b/letta/services/agent_manager.py @@ -46,6 +46,7 @@ from letta.schemas.block import Block as PydanticBlock from letta.schemas.block import BlockUpdate from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import ProviderType +from letta.schemas.file import FileMetadata as PydanticFileMetadata from letta.schemas.group import Group as PydanticGroup from letta.schemas.group import ManagerType from letta.schemas.memory import ContextWindowOverview, Memory @@ -65,6 +66,7 @@ from letta.server.db import db_registry from letta.services.block_manager import BlockManager from letta.services.context_window_calculator.context_window_calculator import ContextWindowCalculator from letta.services.context_window_calculator.token_counter import AnthropicTokenCounter, TiktokenCounter +from letta.services.file_processor.chunker.line_chunker import LineChunker from letta.services.files_agents_manager import FileAgentManager from letta.services.helpers.agent_manager_helper import ( _apply_filters, @@ -2946,6 +2948,83 @@ class AgentManager: tools = result.scalars().all() return [tool.to_pydantic() for tool in tools] + # ====================================================================================================================== + # File Management + # ====================================================================================================================== + async def insert_file_into_context_windows( + self, + source_id: str, + file_metadata_with_content: PydanticFileMetadata, + actor: PydanticUser, + agent_states: Optional[List[PydanticAgentState]] = None, + ) -> List[PydanticAgentState]: + """ + Insert the uploaded document into the context window of all agents + attached to the given source. + """ + agent_states = agent_states or await self.source_manager.list_attached_agents(source_id=source_id, actor=actor) + + # Return early + if not agent_states: + return [] + + logger.info(f"Inserting document into context window for source: {source_id}") + logger.info(f"Attached agents: {[a.id for a in agent_states]}") + + # Generate visible content for the file + line_chunker = LineChunker() + content_lines = line_chunker.chunk_text(file_metadata=file_metadata_with_content) + visible_content = "\n".join(content_lines) + visible_content_map = {file_metadata_with_content.file_name: visible_content} + + # Attach file to each agent using bulk method (one file per agent, but atomic per agent) + all_closed_files = await asyncio.gather( + *( + self.file_agent_manager.attach_files_bulk( + agent_id=agent_state.id, + files_metadata=[file_metadata_with_content], + visible_content_map=visible_content_map, + actor=actor, + max_files_open=agent_state.max_files_open, + ) + for agent_state in agent_states + ) + ) + # Flatten and log if any files were closed + closed_files = [file for closed_list in all_closed_files for file in closed_list] + if closed_files: + logger.info(f"LRU eviction closed {len(closed_files)} files during bulk attach: {closed_files}") + + return agent_states + + async def insert_files_into_context_window( + self, agent_state: PydanticAgentState, file_metadata_with_content: List[PydanticFileMetadata], actor: PydanticUser + ) -> None: + """ + Insert the uploaded documents into the context window of an agent + attached to the given source. + """ + logger.info(f"Inserting {len(file_metadata_with_content)} documents into context window for agent_state: {agent_state.id}") + + # Generate visible content for each file + line_chunker = LineChunker() + visible_content_map = {} + for file_metadata in file_metadata_with_content: + content_lines = line_chunker.chunk_text(file_metadata=file_metadata) + visible_content_map[file_metadata.file_name] = "\n".join(content_lines) + + # Use bulk attach to avoid race conditions and duplicate LRU eviction decisions + closed_files = await self.file_agent_manager.attach_files_bulk( + agent_id=agent_state.id, + files_metadata=file_metadata_with_content, + visible_content_map=visible_content_map, + actor=actor, + max_files_open=agent_state.max_files_open, + ) + + if closed_files: + logger.info(f"LRU eviction closed {len(closed_files)} files during bulk insert: {closed_files}") + # ====================================================================================================================== # Tag Management # ====================================================================================================================== diff --git a/letta/services/agent_serialization_manager.py b/letta/services/agent_serialization_manager.py index e1691b4c..2612baa7 100644 --- a/letta/services/agent_serialization_manager.py +++ b/letta/services/agent_serialization_manager.py @@ -2,6 +2,7 @@ from datetime import datetime, timezone from typing import Dict, List from letta.errors import AgentFileExportError, AgentFileImportError +from letta.helpers.pinecone_utils import should_use_pinecone from letta.log import get_logger from letta.schemas.agent import AgentState, CreateAgent from letta.schemas.agent_file import ( @@ -25,21 +26,24 @@ from letta.schemas.user import User from letta.services.agent_manager import AgentManager from letta.services.block_manager import BlockManager from letta.services.file_manager import FileManager -from letta.services.file_processor.embedder.base_embedder import BaseEmbedder +from letta.services.file_processor.embedder.openai_embedder import OpenAIEmbedder +from letta.services.file_processor.embedder.pinecone_embedder import PineconeEmbedder from letta.services.file_processor.file_processor import FileProcessor -from letta.services.file_processor.parser.base_parser import FileParser +from letta.services.file_processor.parser.markitdown_parser import MarkitdownFileParser +from letta.services.file_processor.parser.mistral_parser import MistralFileParser from letta.services.files_agents_manager import FileAgentManager from letta.services.group_manager import GroupManager from letta.services.mcp_manager import MCPManager from letta.services.message_manager import MessageManager from letta.services.source_manager import SourceManager from letta.services.tool_manager import ToolManager +from letta.settings import settings from letta.utils import get_latest_alembic_revision logger = get_logger(__name__) -class AgentFileManager: +class AgentSerializationManager: """ Manages export and import of agent files between database and AgentFileSchema format. @@ -61,9 +65,6 @@ class AgentFileManager: file_manager: FileManager, file_agent_manager: FileAgentManager, message_manager: MessageManager, - embedder: BaseEmbedder, - file_parser: FileParser, - using_pinecone: bool = False, ): self.agent_manager = agent_manager self.tool_manager = tool_manager @@ -74,9 +75,8 @@ class AgentFileManager: self.file_manager = file_manager self.file_agent_manager = file_agent_manager self.message_manager = message_manager - self.embedder = embedder - self.file_parser = file_parser - self.using_pinecone = using_pinecone + self.file_parser = MistralFileParser() if settings.mistral_api_key else MarkitdownFileParser() + self.using_pinecone = should_use_pinecone() # ID mapping state for export self._db_to_file_ids: Dict[str, str] = {} @@ -428,9 +428,13 @@ class AgentFileManager: imported_count += 1 # 5. Process files for chunking/embedding (depends on files and sources) + if should_use_pinecone(): + embedder = PineconeEmbedder() + else: + embedder = OpenAIEmbedder(embedding_config=schema.agents[0].embedding_config) file_processor = FileProcessor( file_parser=self.file_parser, - embedder=self.embedder, + embedder=embedder, actor=actor, using_pinecone=self.using_pinecone, ) diff --git a/letta/services/file_processor/file_processor.py b/letta/services/file_processor/file_processor.py index 60de7c71..c69dd7eb 100644 --- a/letta/services/file_processor/file_processor.py +++ b/letta/services/file_processor/file_processor.py @@ -10,7 +10,7 @@ from letta.schemas.enums import FileProcessingStatus from letta.schemas.file import FileMetadata from letta.schemas.passage import Passage from letta.schemas.user import User -from letta.server.server import SyncServer +from letta.services.agent_manager import AgentManager from letta.services.file_manager import FileManager from letta.services.file_processor.chunker.line_chunker import LineChunker from letta.services.file_processor.chunker.llama_index_chunker import LlamaIndexChunker @@ -42,6 +42,7 @@ class FileProcessor: self.source_manager = SourceManager() self.passage_manager = PassageManager() self.job_manager = JobManager() + self.agent_manager = AgentManager() self.actor = actor self.using_pinecone = using_pinecone @@ -130,7 +131,6 @@ class FileProcessor: @trace_method async def process( self, - server: SyncServer, agent_states: list[AgentState], source_id: str, content: bytes, @@ -182,7 +182,7 @@ class FileProcessor: ) file_metadata = await self.file_manager.upsert_file_content(file_id=file_metadata.id, text=raw_markdown_text, actor=self.actor) - await server.insert_file_into_context_windows( + await self.agent_manager.insert_file_into_context_windows( source_id=source_id, file_metadata_with_content=file_metadata, actor=self.actor, diff --git a/tests/test_agent_serialization_v2.py b/tests/test_agent_serialization_v2.py index 9509f16e..2a548b68 100644 --- a/tests/test_agent_serialization_v2.py +++ b/tests/test_agent_serialization_v2.py @@ -26,11 +26,7 @@ from letta.schemas.organization import Organization from letta.schemas.source import Source from letta.schemas.user import User from letta.server.server import SyncServer -from letta.services.agent_serialization_manager import AgentFileManager -from letta.services.file_processor.embedder.openai_embedder import OpenAIEmbedder -from letta.services.file_processor.parser.markitdown_parser import MarkitdownFileParser -from letta.services.file_processor.parser.mistral_parser import MistralFileParser -from letta.settings import settings +from letta.services.agent_serialization_manager import AgentSerializationManager from tests.utils import create_tool_from_func # ------------------------------ @@ -159,8 +155,8 @@ def test_block(server: SyncServer, default_user): @pytest.fixture def agent_serialization_manager(server, default_user): - """Fixture to create AgentFileManager with all required services including file processing.""" - manager = AgentFileManager( + """Fixture to create AgentSerializationManager with all required services including file processing.""" + manager = AgentSerializationManager( agent_manager=server.agent_manager, tool_manager=server.tool_manager, source_manager=server.source_manager, @@ -170,9 +166,6 @@ def agent_serialization_manager(server, default_user): file_manager=server.file_manager, file_agent_manager=server.file_agent_manager, message_manager=server.message_manager, - embedder=OpenAIEmbedder(), - file_parser=MistralFileParser() if settings.mistral_api_key else MarkitdownFileParser(), - using_pinecone=False, ) yield manager @@ -282,7 +275,9 @@ async def agent_with_files(server: SyncServer, default_user, test_block, weather actor=default_user, ) - await server.insert_files_into_context_window(agent_state=agent_state, file_metadata_with_content=[test_file], actor=default_user) + await server.agent_manager.insert_files_into_context_window( + agent_state=agent_state, file_metadata_with_content=[test_file], actor=default_user + ) return (agent_state.id, test_source.id, test_file.id) @@ -348,7 +343,9 @@ async def create_test_agent_with_files(server: SyncServer, name: str, user: User for source_id, file_id in file_relationships: file_metadata = await server.file_manager.get_file_by_id(file_id, user) - await server.insert_files_into_context_window(agent_state=agent_state, file_metadata_with_content=[file_metadata], actor=user) + await server.agent_manager.insert_files_into_context_window( + agent_state=agent_state, file_metadata_with_content=[file_metadata], actor=user + ) return agent_state