feat: add new internal capture endpoint (#5750)

This commit is contained in:
cthomas
2025-10-25 17:42:48 -07:00
committed by Caren Thomas
parent b15b04cec0
commit cd2fb0cd02
3 changed files with 70 additions and 7 deletions

View File

@@ -6,7 +6,7 @@ from letta.orm.group import Group
from letta.orm.user import User
from letta.schemas.agent import AgentState
from letta.schemas.group import ManagerType
from letta.schemas.letta_message_content import ImageContent, TextContent
from letta.schemas.letta_message_content import ImageContent, ReasoningContent, TextContent
from letta.schemas.message import Message
from letta.services.mcp.base_client import AsyncBaseMCPClient

View File

@@ -59,8 +59,8 @@ class SleeptimeMultiAgentV4(LettaAgentV3):
request_start_timestamp_ns=request_start_timestamp_ns,
)
await self.run_sleeptime_agents()
response.usage.run_ids = self.run_ids
run_ids = await self.run_sleeptime_agents()
response.usage.run_ids = run_ids
return response
@trace_method
@@ -94,7 +94,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3):
await self.run_sleeptime_agents()
@trace_method
async def run_sleeptime_agents(self):
async def run_sleeptime_agents(self) -> list[str]:
# Get response messages
last_response_messages = self.response_messages
@@ -122,6 +122,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3):
# Individual task failures
print(f"Sleeptime agent processing failed: {e!s}")
raise e
return self.run_ids
@trace_method
async def _issue_background_task(

View File

@@ -25,7 +25,8 @@ from letta.errors import (
AgentNotFoundForExportError,
PendingApprovalError,
)
from letta.helpers.datetime_helpers import get_utc_timestamp_ns
from letta.groups.sleeptime_multi_agent_v4 import SleeptimeMultiAgentV4
from letta.helpers.datetime_helpers import get_utc_time, get_utc_timestamp_ns
from letta.log import get_logger
from letta.orm.errors import NoResultFound
from letta.otel.context import get_ctx_attributes
@@ -33,11 +34,12 @@ from letta.otel.metric_registry import MetricRegistry
from letta.schemas.agent import AgentRelationships, AgentState, CreateAgent, UpdateAgent
from letta.schemas.agent_file import AgentFileSchema
from letta.schemas.block import BaseBlock, Block, BlockResponse, BlockUpdate
from letta.schemas.enums import AgentType, RunStatus
from letta.schemas.enums import AgentType, MessageRole, RunStatus
from letta.schemas.file import AgentFileAttachment, FileMetadataBase, PaginatedAgentFiles
from letta.schemas.group import Group
from letta.schemas.job import LettaRequestConfig
from letta.schemas.letta_message import LettaMessageUnion, LettaMessageUpdateUnion, MessageType
from letta.schemas.letta_message_content import TextContent
from letta.schemas.letta_request import LettaAsyncRequest, LettaRequest, LettaStreamingRequest
from letta.schemas.letta_response import LettaResponse
from letta.schemas.letta_stop_reason import StopReasonType
@@ -48,7 +50,7 @@ from letta.schemas.memory import (
CreateArchivalMemory,
Memory,
)
from letta.schemas.message import BaseMessage, MessageCreate, MessageCreateType, MessageSearchRequest, MessageSearchResult
from letta.schemas.message import Message, MessageCreate, MessageCreateType, MessageSearchRequest, MessageSearchResult
from letta.schemas.passage import Passage
from letta.schemas.run import Run as PydanticRun, RunUpdate
from letta.schemas.source import BaseSource, Source
@@ -1902,3 +1904,63 @@ async def summarize_messages(
status_code=status.HTTP_403_FORBIDDEN,
detail="Summarization is not currently supported for this agent configuration. Please contact Letta support.",
)
class CaptureMessagesRequest(BaseModel):
provider: str
model: str
request_messages: list[dict[str, Any]]
response_dict: dict[str, Any]
@router.post("/{agent_id}/messages/capture", response_model=str, operation_id="capture_messages", include_in_schema=False)
async def capture_messages(
agent_id: AgentId,
request: CaptureMessagesRequest = Body(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Capture a list of messages for an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"])
messages_to_persist = []
# Input user messages
for message in request.request_messages:
if message["role"] == "user":
messages_to_persist.append(
Message(
role=MessageRole.user,
content=[(TextContent(text=message["content"]))],
agent_id=agent_id,
tool_calls=None,
tool_call_id=None,
created_at=get_utc_time(),
)
)
# Assistant response
messages_to_persist.append(
Message(
role=MessageRole.assistant,
content=[(TextContent(text=request.response_dict["content"]))],
agent_id=agent_id,
model=request.model,
tool_calls=None,
tool_call_id=None,
created_at=get_utc_time(),
)
)
response_messages = await server.message_manager.create_many_messages_async(messages_to_persist, actor=actor)
sleeptime_group = agent.multi_agent_group if agent.multi_agent_group and agent.multi_agent_group.manager_type == "sleeptime" else None
if sleeptime_group:
sleeptime_agent_loop = SleeptimeMultiAgentV4(agent_state=agent, actor=actor, group=sleeptime_group)
sleeptime_agent_loop.response_messages = response_messages
run_ids = await sleeptime_agent_loop.run_sleeptime_agents()
return JSONResponse({"success": True, "messages_created": len(response_messages), "run_ids": run_ids})