from typing import TYPE_CHECKING, Dict, List, Optional if TYPE_CHECKING: pass # Import AgentState outside TYPE_CHECKING for @enforce_types decorator from sqlalchemy import and_, asc, delete, desc, func, nulls_last, or_, select from letta.errors import LettaInvalidArgumentError from letta.helpers.datetime_helpers import get_utc_time from letta.orm.agent import Agent as AgentModel from letta.orm.block import Block as BlockModel from letta.orm.blocks_conversations import BlocksConversations from letta.orm.conversation import Conversation as ConversationModel from letta.orm.conversation_messages import ConversationMessage as ConversationMessageModel from letta.orm.message import Message as MessageModel from letta.orm.run import Run as RunModel from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.block import Block as PydanticBlock from letta.schemas.conversation import Conversation as PydanticConversation, CreateConversation, UpdateConversation from letta.schemas.letta_message import LettaMessage from letta.schemas.message import Message as PydanticMessage from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.services.helpers.agent_manager_helper import validate_agent_exists_async from letta.utils import enforce_types class ConversationManager: """Manager class to handle business logic related to Conversations.""" @staticmethod def _serialize_model_settings(model_settings) -> Optional[dict]: """Serialize model settings for DB storage, stripping max_output_tokens if not explicitly set. Uses model_dump() to preserve all fields (including the provider_type discriminator), but removes max_output_tokens when it wasn't explicitly provided by the caller so we don't persist the Pydantic default (4096) and later overwrite the agent's own value. """ if model_settings is None: return None data = model_settings.model_dump() if "max_output_tokens" not in model_settings.model_fields_set: data.pop("max_output_tokens", None) return data @enforce_types @trace_method async def create_conversation( self, agent_id: str, conversation_create: CreateConversation, actor: PydanticUser, ) -> PydanticConversation: """Create a new conversation for an agent. Args: agent_id: The ID of the agent this conversation belongs to conversation_create: The conversation creation request, optionally including isolated_block_labels for conversation-specific memory blocks actor: The user performing the action Returns: The created conversation with isolated_block_ids if any were created """ async with db_registry.async_session() as session: # Validate that the agent exists before creating the conversation await validate_agent_exists_async(session, agent_id, actor) conversation = ConversationModel( agent_id=agent_id, summary=conversation_create.summary, organization_id=actor.organization_id, model=conversation_create.model, model_settings=self._serialize_model_settings(conversation_create.model_settings), ) await conversation.create_async(session, actor=actor) # Handle isolated blocks if requested isolated_block_ids = [] if conversation_create.isolated_block_labels: isolated_block_ids = await self._create_isolated_blocks( session=session, conversation=conversation, agent_id=agent_id, isolated_block_labels=conversation_create.isolated_block_labels, actor=actor, ) pydantic_conversation = conversation.to_pydantic() pydantic_conversation.isolated_block_ids = isolated_block_ids # Compile and persist the initial system message for this conversation # This ensures the conversation captures the latest memory block state at creation time await self.compile_and_save_system_message_for_conversation( conversation_id=pydantic_conversation.id, agent_id=agent_id, actor=actor, ) return pydantic_conversation @trace_method async def compile_and_save_system_message_for_conversation( self, conversation_id: str, agent_id: str, actor: PydanticUser, agent_state: Optional["AgentState"] = None, message_manager: Optional[object] = None, ) -> PydanticMessage: """Compile and persist the initial system message for a conversation. This recompiles the system prompt with the latest memory block values and metadata, ensuring the conversation starts with an up-to-date system message. This is the single source of truth for creating a conversation's system message — used both at conversation creation time and as a fallback when a conversation has no messages yet. Args: conversation_id: The conversation to add the system message to agent_id: The agent this conversation belongs to actor: The user performing the action agent_state: Optional pre-loaded agent state (avoids redundant DB load) message_manager: Optional pre-loaded MessageManager instance Returns: The persisted system message """ # Lazy imports to avoid circular dependencies from letta.prompts.prompt_generator import PromptGenerator from letta.services.message_manager import MessageManager from letta.services.passage_manager import PassageManager if message_manager is None: message_manager = MessageManager() if agent_state is None: from letta.services.agent_manager import AgentManager agent_state = await AgentManager().get_agent_by_id_async( agent_id=agent_id, include_relationships=["memory", "sources"], actor=actor, ) passage_manager = PassageManager() num_messages = await message_manager.size_async(actor=actor, agent_id=agent_id) num_archival_memories = await passage_manager.agent_passage_size_async(actor=actor, agent_id=agent_id) # Compile the system message with current memory state system_message_str = await PromptGenerator.compile_system_message_async( system_prompt=agent_state.system, in_context_memory=agent_state.memory, in_context_memory_last_edit=get_utc_time(), timezone=agent_state.timezone, user_defined_variables=None, append_icm_if_missing=True, previous_message_count=num_messages, archival_memory_size=num_archival_memories, sources=agent_state.sources, max_files_open=agent_state.max_files_open, ) system_message = PydanticMessage.dict_to_message( agent_id=agent_id, model=agent_state.llm_config.model, openai_message_dict={"role": "system", "content": system_message_str}, ) # Persist the new system message persisted_messages = await message_manager.create_many_messages_async([system_message], actor=actor) system_message = persisted_messages[0] # Add it to the conversation tracking at position 0 await self.add_messages_to_conversation( conversation_id=conversation_id, agent_id=agent_id, message_ids=[system_message.id], actor=actor, starting_position=0, ) return system_message @enforce_types @trace_method async def get_conversation_by_id( self, conversation_id: str, actor: PydanticUser, ) -> PydanticConversation: """Retrieve a conversation by its ID, including in-context message IDs.""" async with db_registry.async_session() as session: conversation = await ConversationModel.read_async( db_session=session, identifier=conversation_id, actor=actor, check_is_deleted=True, ) # Get the in-context message IDs for this conversation message_ids = await self.get_message_ids_for_conversation( conversation_id=conversation_id, actor=actor, ) # Build the pydantic model with in_context_message_ids pydantic_conversation = conversation.to_pydantic() pydantic_conversation.in_context_message_ids = message_ids return pydantic_conversation @enforce_types @trace_method async def list_conversations( self, agent_id: Optional[str], actor: PydanticUser, limit: int = 50, after: Optional[str] = None, summary_search: Optional[str] = None, ascending: bool = False, sort_by: str = "created_at", ) -> List[PydanticConversation]: """List conversations for an agent (or all conversations) with cursor-based pagination. Args: agent_id: The agent ID to list conversations for (optional - returns all if not provided) actor: The user performing the action limit: Maximum number of conversations to return after: Cursor for pagination (conversation ID) summary_search: Optional text to search for within the summary field ascending: Sort order (True for oldest first, False for newest first) sort_by: Field to sort by ("created_at" or "last_run_completion") Returns: List of conversations matching the criteria """ async with db_registry.async_session() as session: # Build base query with optional join for last_run_completion if sort_by == "last_run_completion": # Subquery to get the latest completed_at for each conversation latest_run_subquery = ( select(RunModel.conversation_id, func.max(RunModel.completed_at).label("last_run_completion")) .where(RunModel.conversation_id.isnot(None)) .group_by(RunModel.conversation_id) .subquery() ) # Join conversations with the subquery stmt = select(ConversationModel).outerjoin( latest_run_subquery, ConversationModel.id == latest_run_subquery.c.conversation_id ) sort_column = latest_run_subquery.c.last_run_completion sort_nulls_last = True else: # Simple query for created_at stmt = select(ConversationModel) sort_column = ConversationModel.created_at sort_nulls_last = False # Build where conditions conditions = [ ConversationModel.organization_id == actor.organization_id, ConversationModel.is_deleted == False, ] # Add agent_id filter if provided if agent_id is not None: conditions.append(ConversationModel.agent_id == agent_id) # Add summary search filter if provided if summary_search: conditions.extend( [ ConversationModel.summary.isnot(None), ConversationModel.summary.contains(summary_search), ] ) stmt = stmt.where(and_(*conditions)) # Handle cursor pagination if after: # Get the sort value for the cursor conversation if sort_by == "last_run_completion": cursor_query = ( select(ConversationModel.id, func.max(RunModel.completed_at).label("last_run_completion")) .outerjoin(RunModel, ConversationModel.id == RunModel.conversation_id) .where(ConversationModel.id == after) .group_by(ConversationModel.id) ) result = (await session.execute(cursor_query)).first() if result: after_id, after_sort_value = result # Apply cursor filter if after_sort_value is None: # Cursor is at NULL - if ascending, get non-NULLs or NULLs with greater ID if ascending: stmt = stmt.where( or_(and_(sort_column.is_(None), ConversationModel.id > after_id), sort_column.isnot(None)) ) else: # If descending, get NULLs with smaller ID stmt = stmt.where(and_(sort_column.is_(None), ConversationModel.id < after_id)) else: # Cursor is at non-NULL if ascending: # Moving forward: greater values or same value with greater ID stmt = stmt.where( and_( sort_column.isnot(None), or_( sort_column > after_sort_value, and_(sort_column == after_sort_value, ConversationModel.id > after_id), ), ) ) else: # Moving backward: smaller values or NULLs or same value with smaller ID stmt = stmt.where( or_( sort_column.is_(None), sort_column < after_sort_value, and_(sort_column == after_sort_value, ConversationModel.id < after_id), ) ) else: # Simple created_at cursor after_conv = await ConversationModel.read_async( db_session=session, identifier=after, actor=actor, ) if ascending: stmt = stmt.where(ConversationModel.created_at > after_conv.created_at) else: stmt = stmt.where(ConversationModel.created_at < after_conv.created_at) # Apply ordering order_fn = asc if ascending else desc if sort_nulls_last: stmt = stmt.order_by(nulls_last(order_fn(sort_column)), order_fn(ConversationModel.id)) else: stmt = stmt.order_by(order_fn(sort_column), order_fn(ConversationModel.id)) stmt = stmt.limit(limit) result = await session.execute(stmt) conversations = result.scalars().all() return [conv.to_pydantic() for conv in conversations] @enforce_types @trace_method async def update_conversation( self, conversation_id: str, conversation_update: UpdateConversation, actor: PydanticUser, ) -> PydanticConversation: """Update a conversation.""" async with db_registry.async_session() as session: conversation = await ConversationModel.read_async( db_session=session, identifier=conversation_id, actor=actor, check_is_deleted=True, ) # Set attributes on the model update_data = conversation_update.model_dump(exclude_none=True) for key, value in update_data.items(): # model_settings needs to be serialized to dict for the JSON column if key == "model_settings" and value is not None: setattr( conversation, key, self._serialize_model_settings(conversation_update.model_settings) if conversation_update.model_settings else value, ) else: setattr(conversation, key, value) # Commit the update updated_conversation = await conversation.update_async( db_session=session, actor=actor, ) return updated_conversation.to_pydantic() @enforce_types @trace_method async def delete_conversation( self, conversation_id: str, actor: PydanticUser, ) -> None: """Soft delete a conversation and hard-delete its isolated blocks.""" async with db_registry.async_session() as session: conversation = await ConversationModel.read_async( db_session=session, identifier=conversation_id, actor=actor, check_is_deleted=True, ) # Get isolated blocks before modifying conversation isolated_blocks = list(conversation.isolated_blocks) # Soft delete the conversation first conversation.is_deleted = True await conversation.update_async(db_session=session, actor=actor) # Hard-delete isolated blocks (Block model doesn't support soft-delete) # Following same pattern as block_manager.delete_block_async for block in isolated_blocks: # Delete junction table entry first await session.execute(delete(BlocksConversations).where(BlocksConversations.block_id == block.id)) await session.flush() # Then hard-delete the block await block.hard_delete_async(db_session=session, actor=actor) # ==================== Message Management Methods ==================== @enforce_types @trace_method async def get_message_ids_for_conversation( self, conversation_id: str, actor: PydanticUser, ) -> List[str]: """ Get ordered message IDs for a conversation. Returns message IDs ordered by position in the conversation. Only returns messages that are currently in_context. """ async with db_registry.async_session() as session: query = ( select(ConversationMessageModel.message_id) .where( ConversationMessageModel.conversation_id == conversation_id, ConversationMessageModel.organization_id == actor.organization_id, ConversationMessageModel.in_context == True, ConversationMessageModel.is_deleted == False, ) .order_by(ConversationMessageModel.position) ) result = await session.execute(query) return list(result.scalars().all()) @enforce_types @trace_method async def get_messages_for_conversation( self, conversation_id: str, actor: PydanticUser, ) -> List[PydanticMessage]: """ Get ordered Message objects for a conversation. Returns full Message objects ordered by position in the conversation. Only returns messages that are currently in_context. """ async with db_registry.async_session() as session: query = ( select(MessageModel) .join( ConversationMessageModel, MessageModel.id == ConversationMessageModel.message_id, ) .where( ConversationMessageModel.conversation_id == conversation_id, ConversationMessageModel.organization_id == actor.organization_id, ConversationMessageModel.in_context == True, ConversationMessageModel.is_deleted == False, ) .order_by(ConversationMessageModel.position) ) result = await session.execute(query) return [msg.to_pydantic() for msg in result.scalars().all()] @enforce_types @trace_method async def add_messages_to_conversation( self, conversation_id: str, agent_id: str, message_ids: List[str], actor: PydanticUser, starting_position: Optional[int] = None, ) -> None: """ Add messages to a conversation's tracking table. Creates ConversationMessage entries with auto-incrementing positions. Args: conversation_id: The conversation to add messages to agent_id: The agent ID message_ids: List of message IDs to add actor: The user performing the action starting_position: Optional starting position (defaults to next available) """ if not message_ids: return async with db_registry.async_session() as session: # Get starting position if not provided if starting_position is None: query = select(func.coalesce(func.max(ConversationMessageModel.position), -1)).where( ConversationMessageModel.conversation_id == conversation_id, ConversationMessageModel.organization_id == actor.organization_id, ) result = await session.execute(query) max_position = result.scalar() # Use explicit None check instead of `or` to handle position=0 correctly if max_position is None: max_position = -1 starting_position = max_position + 1 # Create ConversationMessage entries for i, message_id in enumerate(message_ids): conv_msg = ConversationMessageModel( conversation_id=conversation_id, agent_id=agent_id, message_id=message_id, position=starting_position + i, in_context=True, organization_id=actor.organization_id, ) session.add(conv_msg) await session.commit() @enforce_types @trace_method async def update_in_context_messages( self, conversation_id: str, in_context_message_ids: List[str], actor: PydanticUser, ) -> None: """ Update which messages are in context for a conversation. Sets in_context=True for messages in the list, False for others. Also updates positions to preserve the order specified in in_context_message_ids. This is critical for correctness: when summarization inserts a summary message that needs to appear before an approval request, the positions must reflect the intended order, not the insertion order. Args: conversation_id: The conversation to update in_context_message_ids: List of message IDs in the desired order actor: The user performing the action """ async with db_registry.async_session() as session: # Get all conversation messages for this conversation query = select(ConversationMessageModel).where( ConversationMessageModel.conversation_id == conversation_id, ConversationMessageModel.organization_id == actor.organization_id, ConversationMessageModel.is_deleted == False, ) result = await session.execute(query) conv_messages = result.scalars().all() # Build lookup dict conv_msg_dict = {cm.message_id: cm for cm in conv_messages} # Update in_context status AND positions in_context_set = set(in_context_message_ids) for conv_msg in conv_messages: conv_msg.in_context = conv_msg.message_id in in_context_set # Update positions to match the order in in_context_message_ids # This ensures ORDER BY position returns messages in the correct order for position, message_id in enumerate(in_context_message_ids): if message_id in conv_msg_dict: conv_msg_dict[message_id].position = position await session.commit() @enforce_types @trace_method async def list_conversation_messages( self, conversation_id: str, actor: PydanticUser, limit: Optional[int] = 100, before: Optional[str] = None, after: Optional[str] = None, reverse: bool = False, group_id: Optional[str] = None, include_err: Optional[bool] = None, ) -> List[LettaMessage]: """ List all messages in a conversation with pagination support. Unlike get_messages_for_conversation, this returns ALL messages (not just in_context) and supports cursor-based pagination. Args: conversation_id: The conversation to list messages for actor: The user performing the action limit: Maximum number of messages to return before: Return messages before this message ID after: Return messages after this message ID reverse: If True, return messages in descending order (newest first) group_id: Optional group ID to filter messages by include_err: Optional boolean to include error messages and error statuses Returns: List of LettaMessage objects """ async with db_registry.async_session() as session: # Build base query joining Message with ConversationMessage query = ( select(MessageModel) .join( ConversationMessageModel, MessageModel.id == ConversationMessageModel.message_id, ) .where( ConversationMessageModel.conversation_id == conversation_id, ConversationMessageModel.organization_id == actor.organization_id, ConversationMessageModel.is_deleted == False, ) ) # Filter by group_id if provided if group_id: query = query.where(MessageModel.group_id == group_id) # Handle cursor-based pagination if before: # Get the position of the cursor message cursor_query = select(ConversationMessageModel.position).where( ConversationMessageModel.conversation_id == conversation_id, ConversationMessageModel.message_id == before, ) cursor_result = await session.execute(cursor_query) cursor_position = cursor_result.scalar_one_or_none() if cursor_position is not None: query = query.where(ConversationMessageModel.position < cursor_position) if after: # Get the position of the cursor message cursor_query = select(ConversationMessageModel.position).where( ConversationMessageModel.conversation_id == conversation_id, ConversationMessageModel.message_id == after, ) cursor_result = await session.execute(cursor_query) cursor_position = cursor_result.scalar_one_or_none() if cursor_position is not None: query = query.where(ConversationMessageModel.position > cursor_position) # Order by position if reverse: query = query.order_by(ConversationMessageModel.position.desc()) else: query = query.order_by(ConversationMessageModel.position.asc()) # Apply limit if limit is not None: query = query.limit(limit) result = await session.execute(query) messages = [msg.to_pydantic() for msg in result.scalars().all()] # Convert to LettaMessages (reverse=False keeps sub-messages in natural order) return PydanticMessage.to_letta_messages_from_list( messages, reverse=False, include_err=include_err, text_is_assistant_message=True ) # ==================== Isolated Blocks Methods ==================== async def _create_isolated_blocks( self, session, conversation: ConversationModel, agent_id: str, isolated_block_labels: List[str], actor: PydanticUser, ) -> List[str]: """Create conversation-specific copies of blocks for isolated labels. Args: session: The database session conversation: The conversation model (must be created but not yet committed) agent_id: The agent ID to get source blocks from isolated_block_labels: List of block labels to isolate actor: The user performing the action Returns: List of created block IDs Raises: LettaInvalidArgumentError: If a block label is not found on the agent """ # Get the agent with its blocks agent = await AgentModel.read_async(db_session=session, identifier=agent_id, actor=actor) # Map of label -> agent block agent_blocks_by_label = {block.label: block for block in agent.core_memory} created_block_ids = [] for label in isolated_block_labels: if label not in agent_blocks_by_label: raise LettaInvalidArgumentError( f"Block with label '{label}' not found on agent '{agent_id}'", argument_name="isolated_block_labels", ) source_block = agent_blocks_by_label[label] # Create a copy of the block with a new ID using Pydantic schema (which auto-generates ID) new_block_pydantic = PydanticBlock( label=source_block.label, description=source_block.description, value=source_block.value, limit=source_block.limit, metadata=source_block.metadata_, read_only=source_block.read_only, ) # Convert to ORM model block_data = new_block_pydantic.model_dump(to_orm=True, exclude_none=True) new_block = BlockModel(**block_data, organization_id=actor.organization_id) await new_block.create_async(session, actor=actor) # Create the junction table entry blocks_conv = BlocksConversations( conversation_id=conversation.id, block_id=new_block.id, block_label=label, ) session.add(blocks_conv) created_block_ids.append(new_block.id) return created_block_ids @enforce_types @trace_method async def get_isolated_blocks_for_conversation( self, conversation_id: str, actor: PydanticUser, ) -> Dict[str, PydanticBlock]: """Get isolated blocks for a conversation, keyed by label. Args: conversation_id: The conversation ID actor: The user performing the action Returns: Dictionary mapping block labels to their conversation-specific blocks """ async with db_registry.async_session() as session: conversation = await ConversationModel.read_async( db_session=session, identifier=conversation_id, actor=actor, check_is_deleted=True, ) return {block.label: block.to_pydantic() for block in conversation.isolated_blocks} @enforce_types @trace_method async def apply_isolated_blocks_to_agent_state( self, agent_state: "AgentState", conversation_id: str, actor: PydanticUser, ) -> "AgentState": """Apply conversation-specific block overrides to an agent state. This method modifies the agent_state.memory to replace blocks that have conversation-specific isolated versions. Args: agent_state: The agent state to modify (will be modified in place) conversation_id: The conversation ID to get isolated blocks from actor: The user performing the action Returns: The modified agent state (same object, modified in place) """ from letta.schemas.memory import Memory # Get conversation's isolated blocks isolated_blocks = await self.get_isolated_blocks_for_conversation( conversation_id=conversation_id, actor=actor, ) if not isolated_blocks: return agent_state # Override agent's blocks with conversation-specific blocks memory_blocks = [] for block in agent_state.memory.blocks: if block.label in isolated_blocks: memory_blocks.append(isolated_blocks[block.label]) else: memory_blocks.append(block) # Create new Memory with overridden blocks agent_state.memory = Memory( blocks=memory_blocks, file_blocks=agent_state.memory.file_blocks, agent_type=agent_state.memory.agent_type, git_enabled=agent_state.memory.git_enabled, ) return agent_state