feat: add order_by and order params to /v1/conversations list endpoin… (#9599)
* feat: add order_by and order params to /v1/conversations list endpoint [LET-7628] Added sorting support to the conversations list endpoint, matching the pattern from /v1/agents. **API Changes:** - Added `order` query param: "asc" or "desc" (default: "desc") - Added `order_by` query param: "created_at" or "last_run_completion" (default: "created_at") **Implementation:** **created_at ordering:** - Simple ORDER BY on ConversationModel.created_at - No join required, fast query - Nulls not applicable (created_at always set) **last_run_completion ordering:** - LEFT JOIN with runs table using subquery - Subquery: MAX(completed_at) grouped by conversation_id - Uses OUTER JOIN so conversations with no runs are included - Nulls last ordering (conversations with no runs go to end) - Index on runs.conversation_id ensures performant join **Pagination:** - Cursor-based pagination with `after` parameter - Handles null values correctly for last_run_completion - For created_at: simple timestamp comparison - For last_run_completion: complex null-aware cursor logic **Performance:** - Existing index: `ix_runs_conversation_id` on runs table - Subquery with GROUP BY is efficient for this use case - OUTER JOIN ensures conversations without runs are included **Follows agents pattern:** - Same parameter names (order, order_by) - Same Literal types and defaults - Converts "asc"/"desc" to ascending boolean internally 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * chore: order --------- Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
committed by
Caren Thomas
parent
afbc416972
commit
f082fd5061
@@ -8786,6 +8786,32 @@
|
||||
"title": "Summary Search"
|
||||
},
|
||||
"description": "Search for text within conversation summaries"
|
||||
},
|
||||
{
|
||||
"name": "order",
|
||||
"in": "query",
|
||||
"required": false,
|
||||
"schema": {
|
||||
"enum": ["asc", "desc"],
|
||||
"type": "string",
|
||||
"description": "Sort order for conversations. 'asc' for oldest first, 'desc' for newest first",
|
||||
"default": "desc",
|
||||
"title": "Order"
|
||||
},
|
||||
"description": "Sort order for conversations. 'asc' for oldest first, 'desc' for newest first"
|
||||
},
|
||||
{
|
||||
"name": "order_by",
|
||||
"in": "query",
|
||||
"required": false,
|
||||
"schema": {
|
||||
"enum": ["created_at", "last_run_completion"],
|
||||
"type": "string",
|
||||
"description": "Field to sort by",
|
||||
"default": "created_at",
|
||||
"title": "Order By"
|
||||
},
|
||||
"description": "Field to sort by"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
|
||||
@@ -66,17 +66,24 @@ async def list_conversations(
|
||||
limit: int = Query(50, description="Maximum number of conversations to return"),
|
||||
after: Optional[str] = Query(None, description="Cursor for pagination (conversation ID)"),
|
||||
summary_search: Optional[str] = Query(None, description="Search for text within conversation summaries"),
|
||||
order: Literal["asc", "desc"] = Query(
|
||||
"desc", description="Sort order for conversations. 'asc' for oldest first, 'desc' for newest first"
|
||||
),
|
||||
order_by: Literal["created_at", "last_run_completion"] = Query("created_at", description="Field to sort by"),
|
||||
server: SyncServer = Depends(get_letta_server),
|
||||
headers: HeaderParams = Depends(get_headers),
|
||||
):
|
||||
"""List all conversations for an agent (or all conversations if agent_id not provided)."""
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
ascending = order == "asc"
|
||||
return await conversation_manager.list_conversations(
|
||||
agent_id=agent_id,
|
||||
actor=actor,
|
||||
limit=limit,
|
||||
after=after,
|
||||
summary_search=summary_search,
|
||||
ascending=ascending,
|
||||
sort_by=order_by,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
# Import AgentState outside TYPE_CHECKING for @enforce_types decorator
|
||||
from sqlalchemy import delete, func, select
|
||||
from sqlalchemy import and_, asc, delete, desc, func, nulls_last, or_, select
|
||||
|
||||
from letta.errors import LettaInvalidArgumentError
|
||||
from letta.orm.agent import Agent as AgentModel
|
||||
@@ -13,6 +13,7 @@ from letta.orm.blocks_conversations import BlocksConversations
|
||||
from letta.orm.conversation import Conversation as ConversationModel
|
||||
from letta.orm.conversation_messages import ConversationMessage as ConversationMessageModel
|
||||
from letta.orm.message import Message as MessageModel
|
||||
from letta.orm.run import Run as RunModel
|
||||
from letta.otel.tracing import trace_method
|
||||
from letta.schemas.agent import AgentState
|
||||
from letta.schemas.block import Block as PydanticBlock
|
||||
@@ -110,6 +111,8 @@ class ConversationManager:
|
||||
limit: int = 50,
|
||||
after: Optional[str] = None,
|
||||
summary_search: Optional[str] = None,
|
||||
ascending: bool = False,
|
||||
sort_by: str = "created_at",
|
||||
) -> List[PydanticConversation]:
|
||||
"""List conversations for an agent (or all conversations) with cursor-based pagination.
|
||||
|
||||
@@ -119,54 +122,139 @@ class ConversationManager:
|
||||
limit: Maximum number of conversations to return
|
||||
after: Cursor for pagination (conversation ID)
|
||||
summary_search: Optional text to search for within the summary field
|
||||
ascending: Sort order (True for oldest first, False for newest first)
|
||||
sort_by: Field to sort by ("created_at" or "last_run_completion")
|
||||
|
||||
Returns:
|
||||
List of conversations matching the criteria
|
||||
"""
|
||||
async with db_registry.async_session() as session:
|
||||
# If summary search is provided, use custom query
|
||||
if summary_search:
|
||||
from sqlalchemy import and_
|
||||
# Build base query with optional join for last_run_completion
|
||||
if sort_by == "last_run_completion":
|
||||
# Subquery to get the latest completed_at for each conversation
|
||||
latest_run_subquery = (
|
||||
select(
|
||||
RunModel.conversation_id,
|
||||
func.max(RunModel.completed_at).label("last_run_completion")
|
||||
)
|
||||
.where(RunModel.conversation_id.isnot(None))
|
||||
.group_by(RunModel.conversation_id)
|
||||
.subquery()
|
||||
)
|
||||
|
||||
# Join conversations with the subquery
|
||||
stmt = (
|
||||
select(ConversationModel)
|
||||
.outerjoin(
|
||||
latest_run_subquery,
|
||||
ConversationModel.id == latest_run_subquery.c.conversation_id
|
||||
)
|
||||
)
|
||||
sort_column = latest_run_subquery.c.last_run_completion
|
||||
sort_nulls_last = True
|
||||
else:
|
||||
# Simple query for created_at
|
||||
stmt = select(ConversationModel)
|
||||
sort_column = ConversationModel.created_at
|
||||
sort_nulls_last = False
|
||||
|
||||
# Build where conditions
|
||||
conditions = [
|
||||
ConversationModel.organization_id == actor.organization_id,
|
||||
ConversationModel.is_deleted == False,
|
||||
ConversationModel.summary.isnot(None),
|
||||
ConversationModel.summary.contains(summary_search),
|
||||
]
|
||||
|
||||
# Add agent_id filter if provided
|
||||
if agent_id is not None:
|
||||
conditions.append(ConversationModel.agent_id == agent_id)
|
||||
|
||||
stmt = select(ConversationModel).where(and_(*conditions)).order_by(ConversationModel.created_at.desc()).limit(limit)
|
||||
# Add summary search filter if provided
|
||||
if summary_search:
|
||||
conditions.extend([
|
||||
ConversationModel.summary.isnot(None),
|
||||
ConversationModel.summary.contains(summary_search),
|
||||
])
|
||||
|
||||
stmt = stmt.where(and_(*conditions))
|
||||
|
||||
# Handle cursor pagination
|
||||
if after:
|
||||
# Add cursor filtering
|
||||
# Get the sort value for the cursor conversation
|
||||
if sort_by == "last_run_completion":
|
||||
cursor_query = (
|
||||
select(
|
||||
ConversationModel.id,
|
||||
func.max(RunModel.completed_at).label("last_run_completion")
|
||||
)
|
||||
.outerjoin(RunModel, ConversationModel.id == RunModel.conversation_id)
|
||||
.where(ConversationModel.id == after)
|
||||
.group_by(ConversationModel.id)
|
||||
)
|
||||
result = (await session.execute(cursor_query)).first()
|
||||
if result:
|
||||
after_id, after_sort_value = result
|
||||
# Apply cursor filter
|
||||
if after_sort_value is None:
|
||||
# Cursor is at NULL - if ascending, get non-NULLs or NULLs with greater ID
|
||||
if ascending:
|
||||
stmt = stmt.where(
|
||||
or_(
|
||||
and_(sort_column.is_(None), ConversationModel.id > after_id),
|
||||
sort_column.isnot(None)
|
||||
)
|
||||
)
|
||||
else:
|
||||
# If descending, get NULLs with smaller ID
|
||||
stmt = stmt.where(
|
||||
and_(sort_column.is_(None), ConversationModel.id < after_id)
|
||||
)
|
||||
else:
|
||||
# Cursor is at non-NULL
|
||||
if ascending:
|
||||
# Moving forward: greater values or same value with greater ID
|
||||
stmt = stmt.where(
|
||||
and_(
|
||||
sort_column.isnot(None),
|
||||
or_(
|
||||
sort_column > after_sort_value,
|
||||
and_(sort_column == after_sort_value, ConversationModel.id > after_id)
|
||||
)
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Moving backward: smaller values or NULLs or same value with smaller ID
|
||||
stmt = stmt.where(
|
||||
or_(
|
||||
sort_column.is_(None),
|
||||
sort_column < after_sort_value,
|
||||
and_(sort_column == after_sort_value, ConversationModel.id < after_id)
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Simple created_at cursor
|
||||
after_conv = await ConversationModel.read_async(
|
||||
db_session=session,
|
||||
identifier=after,
|
||||
actor=actor,
|
||||
)
|
||||
if ascending:
|
||||
stmt = stmt.where(ConversationModel.created_at > after_conv.created_at)
|
||||
else:
|
||||
stmt = stmt.where(ConversationModel.created_at < after_conv.created_at)
|
||||
|
||||
# Apply ordering
|
||||
order_fn = asc if ascending else desc
|
||||
if sort_nulls_last:
|
||||
stmt = stmt.order_by(nulls_last(order_fn(sort_column)), order_fn(ConversationModel.id))
|
||||
else:
|
||||
stmt = stmt.order_by(order_fn(sort_column), order_fn(ConversationModel.id))
|
||||
|
||||
stmt = stmt.limit(limit)
|
||||
|
||||
result = await session.execute(stmt)
|
||||
conversations = result.scalars().all()
|
||||
return [conv.to_pydantic() for conv in conversations]
|
||||
|
||||
# Use default list logic
|
||||
conversations = await ConversationModel.list_async(
|
||||
db_session=session,
|
||||
actor=actor,
|
||||
agent_id=agent_id,
|
||||
is_deleted=False,
|
||||
limit=limit,
|
||||
after=after,
|
||||
ascending=False,
|
||||
)
|
||||
return [conv.to_pydantic() for conv in conversations]
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
async def update_conversation(
|
||||
|
||||
Reference in New Issue
Block a user