diff --git a/fern/openapi.json b/fern/openapi.json index 344fef9a..e96ad63d 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -8786,6 +8786,32 @@ "title": "Summary Search" }, "description": "Search for text within conversation summaries" + }, + { + "name": "order", + "in": "query", + "required": false, + "schema": { + "enum": ["asc", "desc"], + "type": "string", + "description": "Sort order for conversations. 'asc' for oldest first, 'desc' for newest first", + "default": "desc", + "title": "Order" + }, + "description": "Sort order for conversations. 'asc' for oldest first, 'desc' for newest first" + }, + { + "name": "order_by", + "in": "query", + "required": false, + "schema": { + "enum": ["created_at", "last_run_completion"], + "type": "string", + "description": "Field to sort by", + "default": "created_at", + "title": "Order By" + }, + "description": "Field to sort by" } ], "responses": { diff --git a/letta/server/rest_api/routers/v1/conversations.py b/letta/server/rest_api/routers/v1/conversations.py index f52e57e5..d369e567 100644 --- a/letta/server/rest_api/routers/v1/conversations.py +++ b/letta/server/rest_api/routers/v1/conversations.py @@ -66,17 +66,24 @@ async def list_conversations( limit: int = Query(50, description="Maximum number of conversations to return"), after: Optional[str] = Query(None, description="Cursor for pagination (conversation ID)"), summary_search: Optional[str] = Query(None, description="Search for text within conversation summaries"), + order: Literal["asc", "desc"] = Query( + "desc", description="Sort order for conversations. 'asc' for oldest first, 'desc' for newest first" + ), + order_by: Literal["created_at", "last_run_completion"] = Query("created_at", description="Field to sort by"), server: SyncServer = Depends(get_letta_server), headers: HeaderParams = Depends(get_headers), ): """List all conversations for an agent (or all conversations if agent_id not provided).""" actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + ascending = order == "asc" return await conversation_manager.list_conversations( agent_id=agent_id, actor=actor, limit=limit, after=after, summary_search=summary_search, + ascending=ascending, + sort_by=order_by, ) diff --git a/letta/services/conversation_manager.py b/letta/services/conversation_manager.py index ee0cffd1..3b95a2e6 100644 --- a/letta/services/conversation_manager.py +++ b/letta/services/conversation_manager.py @@ -4,7 +4,7 @@ if TYPE_CHECKING: pass # Import AgentState outside TYPE_CHECKING for @enforce_types decorator -from sqlalchemy import delete, func, select +from sqlalchemy import and_, asc, delete, desc, func, nulls_last, or_, select from letta.errors import LettaInvalidArgumentError from letta.orm.agent import Agent as AgentModel @@ -13,6 +13,7 @@ from letta.orm.blocks_conversations import BlocksConversations from letta.orm.conversation import Conversation as ConversationModel from letta.orm.conversation_messages import ConversationMessage as ConversationMessageModel from letta.orm.message import Message as MessageModel +from letta.orm.run import Run as RunModel from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.block import Block as PydanticBlock @@ -110,6 +111,8 @@ class ConversationManager: limit: int = 50, after: Optional[str] = None, summary_search: Optional[str] = None, + ascending: bool = False, + sort_by: str = "created_at", ) -> List[PydanticConversation]: """List conversations for an agent (or all conversations) with cursor-based pagination. @@ -119,52 +122,137 @@ class ConversationManager: limit: Maximum number of conversations to return after: Cursor for pagination (conversation ID) summary_search: Optional text to search for within the summary field + ascending: Sort order (True for oldest first, False for newest first) + sort_by: Field to sort by ("created_at" or "last_run_completion") Returns: List of conversations matching the criteria """ async with db_registry.async_session() as session: - # If summary search is provided, use custom query - if summary_search: - from sqlalchemy import and_ + # Build base query with optional join for last_run_completion + if sort_by == "last_run_completion": + # Subquery to get the latest completed_at for each conversation + latest_run_subquery = ( + select( + RunModel.conversation_id, + func.max(RunModel.completed_at).label("last_run_completion") + ) + .where(RunModel.conversation_id.isnot(None)) + .group_by(RunModel.conversation_id) + .subquery() + ) - # Build where conditions - conditions = [ - ConversationModel.organization_id == actor.organization_id, - ConversationModel.is_deleted == False, + # Join conversations with the subquery + stmt = ( + select(ConversationModel) + .outerjoin( + latest_run_subquery, + ConversationModel.id == latest_run_subquery.c.conversation_id + ) + ) + sort_column = latest_run_subquery.c.last_run_completion + sort_nulls_last = True + else: + # Simple query for created_at + stmt = select(ConversationModel) + sort_column = ConversationModel.created_at + sort_nulls_last = False + + # Build where conditions + conditions = [ + ConversationModel.organization_id == actor.organization_id, + ConversationModel.is_deleted == False, + ] + + # Add agent_id filter if provided + if agent_id is not None: + conditions.append(ConversationModel.agent_id == agent_id) + + # Add summary search filter if provided + if summary_search: + conditions.extend([ ConversationModel.summary.isnot(None), ConversationModel.summary.contains(summary_search), - ] + ]) - # Add agent_id filter if provided - if agent_id is not None: - conditions.append(ConversationModel.agent_id == agent_id) + stmt = stmt.where(and_(*conditions)) - stmt = select(ConversationModel).where(and_(*conditions)).order_by(ConversationModel.created_at.desc()).limit(limit) - - if after: - # Add cursor filtering + # Handle cursor pagination + if after: + # Get the sort value for the cursor conversation + if sort_by == "last_run_completion": + cursor_query = ( + select( + ConversationModel.id, + func.max(RunModel.completed_at).label("last_run_completion") + ) + .outerjoin(RunModel, ConversationModel.id == RunModel.conversation_id) + .where(ConversationModel.id == after) + .group_by(ConversationModel.id) + ) + result = (await session.execute(cursor_query)).first() + if result: + after_id, after_sort_value = result + # Apply cursor filter + if after_sort_value is None: + # Cursor is at NULL - if ascending, get non-NULLs or NULLs with greater ID + if ascending: + stmt = stmt.where( + or_( + and_(sort_column.is_(None), ConversationModel.id > after_id), + sort_column.isnot(None) + ) + ) + else: + # If descending, get NULLs with smaller ID + stmt = stmt.where( + and_(sort_column.is_(None), ConversationModel.id < after_id) + ) + else: + # Cursor is at non-NULL + if ascending: + # Moving forward: greater values or same value with greater ID + stmt = stmt.where( + and_( + sort_column.isnot(None), + or_( + sort_column > after_sort_value, + and_(sort_column == after_sort_value, ConversationModel.id > after_id) + ) + ) + ) + else: + # Moving backward: smaller values or NULLs or same value with smaller ID + stmt = stmt.where( + or_( + sort_column.is_(None), + sort_column < after_sort_value, + and_(sort_column == after_sort_value, ConversationModel.id < after_id) + ) + ) + else: + # Simple created_at cursor after_conv = await ConversationModel.read_async( db_session=session, identifier=after, actor=actor, ) - stmt = stmt.where(ConversationModel.created_at < after_conv.created_at) + if ascending: + stmt = stmt.where(ConversationModel.created_at > after_conv.created_at) + else: + stmt = stmt.where(ConversationModel.created_at < after_conv.created_at) - result = await session.execute(stmt) - conversations = result.scalars().all() - return [conv.to_pydantic() for conv in conversations] + # Apply ordering + order_fn = asc if ascending else desc + if sort_nulls_last: + stmt = stmt.order_by(nulls_last(order_fn(sort_column)), order_fn(ConversationModel.id)) + else: + stmt = stmt.order_by(order_fn(sort_column), order_fn(ConversationModel.id)) - # Use default list logic - conversations = await ConversationModel.list_async( - db_session=session, - actor=actor, - agent_id=agent_id, - is_deleted=False, - limit=limit, - after=after, - ascending=False, - ) + stmt = stmt.limit(limit) + + result = await session.execute(stmt) + conversations = result.scalars().all() return [conv.to_pydantic() for conv in conversations] @enforce_types