Files
letta-server/letta/services/conversation_manager.py
Shubham Naik f082fd5061 feat: add order_by and order params to /v1/conversations list endpoin… (#9599)
* feat: add order_by and order params to /v1/conversations list endpoint [LET-7628]

Added sorting support to the conversations list endpoint, matching the pattern from /v1/agents.

**API Changes:**
- Added `order` query param: "asc" or "desc" (default: "desc")
- Added `order_by` query param: "created_at" or "last_run_completion" (default: "created_at")

**Implementation:**

**created_at ordering:**
- Simple ORDER BY on ConversationModel.created_at
- No join required, fast query
- Nulls not applicable (created_at always set)

**last_run_completion ordering:**
- LEFT JOIN with runs table using subquery
- Subquery: MAX(completed_at) grouped by conversation_id
- Uses OUTER JOIN so conversations with no runs are included
- Nulls last ordering (conversations with no runs go to end)
- Index on runs.conversation_id ensures performant join

**Pagination:**
- Cursor-based pagination with `after` parameter
- Handles null values correctly for last_run_completion
- For created_at: simple timestamp comparison
- For last_run_completion: complex null-aware cursor logic

**Performance:**
- Existing index: `ix_runs_conversation_id` on runs table
- Subquery with GROUP BY is efficient for this use case
- OUTER JOIN ensures conversations without runs are included

**Follows agents pattern:**
- Same parameter names (order, order_by)
- Same Literal types and defaults
- Converts "asc"/"desc" to ascending boolean internally

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* chore: order

---------

Co-authored-by: Letta <noreply@letta.com>
2026-02-24 10:55:26 -08:00

718 lines
29 KiB
Python

from typing import TYPE_CHECKING, Dict, List, Optional
if TYPE_CHECKING:
pass
# Import AgentState outside TYPE_CHECKING for @enforce_types decorator
from sqlalchemy import and_, asc, delete, desc, func, nulls_last, or_, select
from letta.errors import LettaInvalidArgumentError
from letta.orm.agent import Agent as AgentModel
from letta.orm.block import Block as BlockModel
from letta.orm.blocks_conversations import BlocksConversations
from letta.orm.conversation import Conversation as ConversationModel
from letta.orm.conversation_messages import ConversationMessage as ConversationMessageModel
from letta.orm.message import Message as MessageModel
from letta.orm.run import Run as RunModel
from letta.otel.tracing import trace_method
from letta.schemas.agent import AgentState
from letta.schemas.block import Block as PydanticBlock
from letta.schemas.conversation import Conversation as PydanticConversation, CreateConversation, UpdateConversation
from letta.schemas.letta_message import LettaMessage
from letta.schemas.message import Message as PydanticMessage
from letta.schemas.user import User as PydanticUser
from letta.server.db import db_registry
from letta.services.helpers.agent_manager_helper import validate_agent_exists_async
from letta.utils import enforce_types
class ConversationManager:
"""Manager class to handle business logic related to Conversations."""
@enforce_types
@trace_method
async def create_conversation(
self,
agent_id: str,
conversation_create: CreateConversation,
actor: PydanticUser,
) -> PydanticConversation:
"""Create a new conversation for an agent.
Args:
agent_id: The ID of the agent this conversation belongs to
conversation_create: The conversation creation request, optionally including
isolated_block_labels for conversation-specific memory blocks
actor: The user performing the action
Returns:
The created conversation with isolated_block_ids if any were created
"""
async with db_registry.async_session() as session:
# Validate that the agent exists before creating the conversation
await validate_agent_exists_async(session, agent_id, actor)
conversation = ConversationModel(
agent_id=agent_id,
summary=conversation_create.summary,
organization_id=actor.organization_id,
model=conversation_create.model,
model_settings=conversation_create.model_settings.model_dump() if conversation_create.model_settings else None,
)
await conversation.create_async(session, actor=actor)
# Handle isolated blocks if requested
isolated_block_ids = []
if conversation_create.isolated_block_labels:
isolated_block_ids = await self._create_isolated_blocks(
session=session,
conversation=conversation,
agent_id=agent_id,
isolated_block_labels=conversation_create.isolated_block_labels,
actor=actor,
)
pydantic_conversation = conversation.to_pydantic()
pydantic_conversation.isolated_block_ids = isolated_block_ids
return pydantic_conversation
@enforce_types
@trace_method
async def get_conversation_by_id(
self,
conversation_id: str,
actor: PydanticUser,
) -> PydanticConversation:
"""Retrieve a conversation by its ID, including in-context message IDs."""
async with db_registry.async_session() as session:
conversation = await ConversationModel.read_async(
db_session=session,
identifier=conversation_id,
actor=actor,
check_is_deleted=True,
)
# Get the in-context message IDs for this conversation
message_ids = await self.get_message_ids_for_conversation(
conversation_id=conversation_id,
actor=actor,
)
# Build the pydantic model with in_context_message_ids
pydantic_conversation = conversation.to_pydantic()
pydantic_conversation.in_context_message_ids = message_ids
return pydantic_conversation
@enforce_types
@trace_method
async def list_conversations(
self,
agent_id: Optional[str],
actor: PydanticUser,
limit: int = 50,
after: Optional[str] = None,
summary_search: Optional[str] = None,
ascending: bool = False,
sort_by: str = "created_at",
) -> List[PydanticConversation]:
"""List conversations for an agent (or all conversations) with cursor-based pagination.
Args:
agent_id: The agent ID to list conversations for (optional - returns all if not provided)
actor: The user performing the action
limit: Maximum number of conversations to return
after: Cursor for pagination (conversation ID)
summary_search: Optional text to search for within the summary field
ascending: Sort order (True for oldest first, False for newest first)
sort_by: Field to sort by ("created_at" or "last_run_completion")
Returns:
List of conversations matching the criteria
"""
async with db_registry.async_session() as session:
# Build base query with optional join for last_run_completion
if sort_by == "last_run_completion":
# Subquery to get the latest completed_at for each conversation
latest_run_subquery = (
select(
RunModel.conversation_id,
func.max(RunModel.completed_at).label("last_run_completion")
)
.where(RunModel.conversation_id.isnot(None))
.group_by(RunModel.conversation_id)
.subquery()
)
# Join conversations with the subquery
stmt = (
select(ConversationModel)
.outerjoin(
latest_run_subquery,
ConversationModel.id == latest_run_subquery.c.conversation_id
)
)
sort_column = latest_run_subquery.c.last_run_completion
sort_nulls_last = True
else:
# Simple query for created_at
stmt = select(ConversationModel)
sort_column = ConversationModel.created_at
sort_nulls_last = False
# Build where conditions
conditions = [
ConversationModel.organization_id == actor.organization_id,
ConversationModel.is_deleted == False,
]
# Add agent_id filter if provided
if agent_id is not None:
conditions.append(ConversationModel.agent_id == agent_id)
# Add summary search filter if provided
if summary_search:
conditions.extend([
ConversationModel.summary.isnot(None),
ConversationModel.summary.contains(summary_search),
])
stmt = stmt.where(and_(*conditions))
# Handle cursor pagination
if after:
# Get the sort value for the cursor conversation
if sort_by == "last_run_completion":
cursor_query = (
select(
ConversationModel.id,
func.max(RunModel.completed_at).label("last_run_completion")
)
.outerjoin(RunModel, ConversationModel.id == RunModel.conversation_id)
.where(ConversationModel.id == after)
.group_by(ConversationModel.id)
)
result = (await session.execute(cursor_query)).first()
if result:
after_id, after_sort_value = result
# Apply cursor filter
if after_sort_value is None:
# Cursor is at NULL - if ascending, get non-NULLs or NULLs with greater ID
if ascending:
stmt = stmt.where(
or_(
and_(sort_column.is_(None), ConversationModel.id > after_id),
sort_column.isnot(None)
)
)
else:
# If descending, get NULLs with smaller ID
stmt = stmt.where(
and_(sort_column.is_(None), ConversationModel.id < after_id)
)
else:
# Cursor is at non-NULL
if ascending:
# Moving forward: greater values or same value with greater ID
stmt = stmt.where(
and_(
sort_column.isnot(None),
or_(
sort_column > after_sort_value,
and_(sort_column == after_sort_value, ConversationModel.id > after_id)
)
)
)
else:
# Moving backward: smaller values or NULLs or same value with smaller ID
stmt = stmt.where(
or_(
sort_column.is_(None),
sort_column < after_sort_value,
and_(sort_column == after_sort_value, ConversationModel.id < after_id)
)
)
else:
# Simple created_at cursor
after_conv = await ConversationModel.read_async(
db_session=session,
identifier=after,
actor=actor,
)
if ascending:
stmt = stmt.where(ConversationModel.created_at > after_conv.created_at)
else:
stmt = stmt.where(ConversationModel.created_at < after_conv.created_at)
# Apply ordering
order_fn = asc if ascending else desc
if sort_nulls_last:
stmt = stmt.order_by(nulls_last(order_fn(sort_column)), order_fn(ConversationModel.id))
else:
stmt = stmt.order_by(order_fn(sort_column), order_fn(ConversationModel.id))
stmt = stmt.limit(limit)
result = await session.execute(stmt)
conversations = result.scalars().all()
return [conv.to_pydantic() for conv in conversations]
@enforce_types
@trace_method
async def update_conversation(
self,
conversation_id: str,
conversation_update: UpdateConversation,
actor: PydanticUser,
) -> PydanticConversation:
"""Update a conversation."""
async with db_registry.async_session() as session:
conversation = await ConversationModel.read_async(
db_session=session,
identifier=conversation_id,
actor=actor,
check_is_deleted=True,
)
# Set attributes on the model
update_data = conversation_update.model_dump(exclude_none=True)
for key, value in update_data.items():
# model_settings needs to be serialized to dict for the JSON column
if key == "model_settings" and value is not None:
setattr(conversation, key, conversation_update.model_settings.model_dump() if conversation_update.model_settings else value)
else:
setattr(conversation, key, value)
# Commit the update
updated_conversation = await conversation.update_async(
db_session=session,
actor=actor,
)
return updated_conversation.to_pydantic()
@enforce_types
@trace_method
async def delete_conversation(
self,
conversation_id: str,
actor: PydanticUser,
) -> None:
"""Soft delete a conversation and hard-delete its isolated blocks."""
async with db_registry.async_session() as session:
conversation = await ConversationModel.read_async(
db_session=session,
identifier=conversation_id,
actor=actor,
check_is_deleted=True,
)
# Get isolated blocks before modifying conversation
isolated_blocks = list(conversation.isolated_blocks)
# Soft delete the conversation first
conversation.is_deleted = True
await conversation.update_async(db_session=session, actor=actor)
# Hard-delete isolated blocks (Block model doesn't support soft-delete)
# Following same pattern as block_manager.delete_block_async
for block in isolated_blocks:
# Delete junction table entry first
await session.execute(delete(BlocksConversations).where(BlocksConversations.block_id == block.id))
await session.flush()
# Then hard-delete the block
await block.hard_delete_async(db_session=session, actor=actor)
# ==================== Message Management Methods ====================
@enforce_types
@trace_method
async def get_message_ids_for_conversation(
self,
conversation_id: str,
actor: PydanticUser,
) -> List[str]:
"""
Get ordered message IDs for a conversation.
Returns message IDs ordered by position in the conversation.
Only returns messages that are currently in_context.
"""
async with db_registry.async_session() as session:
query = (
select(ConversationMessageModel.message_id)
.where(
ConversationMessageModel.conversation_id == conversation_id,
ConversationMessageModel.organization_id == actor.organization_id,
ConversationMessageModel.in_context == True,
ConversationMessageModel.is_deleted == False,
)
.order_by(ConversationMessageModel.position)
)
result = await session.execute(query)
return list(result.scalars().all())
@enforce_types
@trace_method
async def get_messages_for_conversation(
self,
conversation_id: str,
actor: PydanticUser,
) -> List[PydanticMessage]:
"""
Get ordered Message objects for a conversation.
Returns full Message objects ordered by position in the conversation.
Only returns messages that are currently in_context.
"""
async with db_registry.async_session() as session:
query = (
select(MessageModel)
.join(
ConversationMessageModel,
MessageModel.id == ConversationMessageModel.message_id,
)
.where(
ConversationMessageModel.conversation_id == conversation_id,
ConversationMessageModel.organization_id == actor.organization_id,
ConversationMessageModel.in_context == True,
ConversationMessageModel.is_deleted == False,
)
.order_by(ConversationMessageModel.position)
)
result = await session.execute(query)
return [msg.to_pydantic() for msg in result.scalars().all()]
@enforce_types
@trace_method
async def add_messages_to_conversation(
self,
conversation_id: str,
agent_id: str,
message_ids: List[str],
actor: PydanticUser,
starting_position: Optional[int] = None,
) -> None:
"""
Add messages to a conversation's tracking table.
Creates ConversationMessage entries with auto-incrementing positions.
Args:
conversation_id: The conversation to add messages to
agent_id: The agent ID
message_ids: List of message IDs to add
actor: The user performing the action
starting_position: Optional starting position (defaults to next available)
"""
if not message_ids:
return
async with db_registry.async_session() as session:
# Get starting position if not provided
if starting_position is None:
query = select(func.coalesce(func.max(ConversationMessageModel.position), -1)).where(
ConversationMessageModel.conversation_id == conversation_id,
ConversationMessageModel.organization_id == actor.organization_id,
)
result = await session.execute(query)
max_position = result.scalar()
# Use explicit None check instead of `or` to handle position=0 correctly
if max_position is None:
max_position = -1
starting_position = max_position + 1
# Create ConversationMessage entries
for i, message_id in enumerate(message_ids):
conv_msg = ConversationMessageModel(
conversation_id=conversation_id,
agent_id=agent_id,
message_id=message_id,
position=starting_position + i,
in_context=True,
organization_id=actor.organization_id,
)
session.add(conv_msg)
await session.commit()
@enforce_types
@trace_method
async def update_in_context_messages(
self,
conversation_id: str,
in_context_message_ids: List[str],
actor: PydanticUser,
) -> None:
"""
Update which messages are in context for a conversation.
Sets in_context=True for messages in the list, False for others.
Also updates positions to preserve the order specified in in_context_message_ids.
This is critical for correctness: when summarization inserts a summary message
that needs to appear before an approval request, the positions must reflect
the intended order, not the insertion order.
Args:
conversation_id: The conversation to update
in_context_message_ids: List of message IDs in the desired order
actor: The user performing the action
"""
async with db_registry.async_session() as session:
# Get all conversation messages for this conversation
query = select(ConversationMessageModel).where(
ConversationMessageModel.conversation_id == conversation_id,
ConversationMessageModel.organization_id == actor.organization_id,
ConversationMessageModel.is_deleted == False,
)
result = await session.execute(query)
conv_messages = result.scalars().all()
# Build lookup dict
conv_msg_dict = {cm.message_id: cm for cm in conv_messages}
# Update in_context status AND positions
in_context_set = set(in_context_message_ids)
for conv_msg in conv_messages:
conv_msg.in_context = conv_msg.message_id in in_context_set
# Update positions to match the order in in_context_message_ids
# This ensures ORDER BY position returns messages in the correct order
for position, message_id in enumerate(in_context_message_ids):
if message_id in conv_msg_dict:
conv_msg_dict[message_id].position = position
await session.commit()
@enforce_types
@trace_method
async def list_conversation_messages(
self,
conversation_id: str,
actor: PydanticUser,
limit: Optional[int] = 100,
before: Optional[str] = None,
after: Optional[str] = None,
reverse: bool = False,
group_id: Optional[str] = None,
include_err: Optional[bool] = None,
) -> List[LettaMessage]:
"""
List all messages in a conversation with pagination support.
Unlike get_messages_for_conversation, this returns ALL messages
(not just in_context) and supports cursor-based pagination.
Args:
conversation_id: The conversation to list messages for
actor: The user performing the action
limit: Maximum number of messages to return
before: Return messages before this message ID
after: Return messages after this message ID
reverse: If True, return messages in descending order (newest first)
group_id: Optional group ID to filter messages by
include_err: Optional boolean to include error messages and error statuses
Returns:
List of LettaMessage objects
"""
async with db_registry.async_session() as session:
# Build base query joining Message with ConversationMessage
query = (
select(MessageModel)
.join(
ConversationMessageModel,
MessageModel.id == ConversationMessageModel.message_id,
)
.where(
ConversationMessageModel.conversation_id == conversation_id,
ConversationMessageModel.organization_id == actor.organization_id,
ConversationMessageModel.is_deleted == False,
)
)
# Filter by group_id if provided
if group_id:
query = query.where(MessageModel.group_id == group_id)
# Handle cursor-based pagination
if before:
# Get the position of the cursor message
cursor_query = select(ConversationMessageModel.position).where(
ConversationMessageModel.conversation_id == conversation_id,
ConversationMessageModel.message_id == before,
)
cursor_result = await session.execute(cursor_query)
cursor_position = cursor_result.scalar_one_or_none()
if cursor_position is not None:
query = query.where(ConversationMessageModel.position < cursor_position)
if after:
# Get the position of the cursor message
cursor_query = select(ConversationMessageModel.position).where(
ConversationMessageModel.conversation_id == conversation_id,
ConversationMessageModel.message_id == after,
)
cursor_result = await session.execute(cursor_query)
cursor_position = cursor_result.scalar_one_or_none()
if cursor_position is not None:
query = query.where(ConversationMessageModel.position > cursor_position)
# Order by position
if reverse:
query = query.order_by(ConversationMessageModel.position.desc())
else:
query = query.order_by(ConversationMessageModel.position.asc())
# Apply limit
if limit is not None:
query = query.limit(limit)
result = await session.execute(query)
messages = [msg.to_pydantic() for msg in result.scalars().all()]
# Convert to LettaMessages (reverse=False keeps sub-messages in natural order)
return PydanticMessage.to_letta_messages_from_list(
messages, reverse=False, include_err=include_err, text_is_assistant_message=True
)
# ==================== Isolated Blocks Methods ====================
async def _create_isolated_blocks(
self,
session,
conversation: ConversationModel,
agent_id: str,
isolated_block_labels: List[str],
actor: PydanticUser,
) -> List[str]:
"""Create conversation-specific copies of blocks for isolated labels.
Args:
session: The database session
conversation: The conversation model (must be created but not yet committed)
agent_id: The agent ID to get source blocks from
isolated_block_labels: List of block labels to isolate
actor: The user performing the action
Returns:
List of created block IDs
Raises:
LettaInvalidArgumentError: If a block label is not found on the agent
"""
# Get the agent with its blocks
agent = await AgentModel.read_async(db_session=session, identifier=agent_id, actor=actor)
# Map of label -> agent block
agent_blocks_by_label = {block.label: block for block in agent.core_memory}
created_block_ids = []
for label in isolated_block_labels:
if label not in agent_blocks_by_label:
raise LettaInvalidArgumentError(
f"Block with label '{label}' not found on agent '{agent_id}'",
argument_name="isolated_block_labels",
)
source_block = agent_blocks_by_label[label]
# Create a copy of the block with a new ID using Pydantic schema (which auto-generates ID)
new_block_pydantic = PydanticBlock(
label=source_block.label,
description=source_block.description,
value=source_block.value,
limit=source_block.limit,
metadata=source_block.metadata_,
read_only=source_block.read_only,
)
# Convert to ORM model
block_data = new_block_pydantic.model_dump(to_orm=True, exclude_none=True)
new_block = BlockModel(**block_data, organization_id=actor.organization_id)
await new_block.create_async(session, actor=actor)
# Create the junction table entry
blocks_conv = BlocksConversations(
conversation_id=conversation.id,
block_id=new_block.id,
block_label=label,
)
session.add(blocks_conv)
created_block_ids.append(new_block.id)
return created_block_ids
@enforce_types
@trace_method
async def get_isolated_blocks_for_conversation(
self,
conversation_id: str,
actor: PydanticUser,
) -> Dict[str, PydanticBlock]:
"""Get isolated blocks for a conversation, keyed by label.
Args:
conversation_id: The conversation ID
actor: The user performing the action
Returns:
Dictionary mapping block labels to their conversation-specific blocks
"""
async with db_registry.async_session() as session:
conversation = await ConversationModel.read_async(
db_session=session,
identifier=conversation_id,
actor=actor,
check_is_deleted=True,
)
return {block.label: block.to_pydantic() for block in conversation.isolated_blocks}
@enforce_types
@trace_method
async def apply_isolated_blocks_to_agent_state(
self,
agent_state: "AgentState",
conversation_id: str,
actor: PydanticUser,
) -> "AgentState":
"""Apply conversation-specific block overrides to an agent state.
This method modifies the agent_state.memory to replace blocks that have
conversation-specific isolated versions.
Args:
agent_state: The agent state to modify (will be modified in place)
conversation_id: The conversation ID to get isolated blocks from
actor: The user performing the action
Returns:
The modified agent state (same object, modified in place)
"""
from letta.schemas.memory import Memory
# Get conversation's isolated blocks
isolated_blocks = await self.get_isolated_blocks_for_conversation(
conversation_id=conversation_id,
actor=actor,
)
if not isolated_blocks:
return agent_state
# Override agent's blocks with conversation-specific blocks
memory_blocks = []
for block in agent_state.memory.blocks:
if block.label in isolated_blocks:
memory_blocks.append(isolated_blocks[block.label])
else:
memory_blocks.append(block)
# Create new Memory with overridden blocks
agent_state.memory = Memory(
blocks=memory_blocks,
file_blocks=agent_state.memory.file_blocks,
agent_type=agent_state.memory.agent_type,
git_enabled=agent_state.memory.git_enabled,
)
return agent_state