diff --git a/letta/groups/helpers.py b/letta/groups/helpers.py index 5192fb36..3121de3b 100644 --- a/letta/groups/helpers.py +++ b/letta/groups/helpers.py @@ -6,7 +6,7 @@ from letta.orm.group import Group from letta.orm.user import User from letta.schemas.agent import AgentState from letta.schemas.group import ManagerType -from letta.schemas.letta_message_content import ImageContent, TextContent +from letta.schemas.letta_message_content import ImageContent, ReasoningContent, TextContent from letta.schemas.message import Message from letta.services.mcp.base_client import AsyncBaseMCPClient diff --git a/letta/groups/sleeptime_multi_agent_v4.py b/letta/groups/sleeptime_multi_agent_v4.py index 0e409aab..6580b432 100644 --- a/letta/groups/sleeptime_multi_agent_v4.py +++ b/letta/groups/sleeptime_multi_agent_v4.py @@ -59,8 +59,8 @@ class SleeptimeMultiAgentV4(LettaAgentV3): request_start_timestamp_ns=request_start_timestamp_ns, ) - await self.run_sleeptime_agents() - response.usage.run_ids = self.run_ids + run_ids = await self.run_sleeptime_agents() + response.usage.run_ids = run_ids return response @trace_method @@ -94,7 +94,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3): await self.run_sleeptime_agents() @trace_method - async def run_sleeptime_agents(self): + async def run_sleeptime_agents(self) -> list[str]: # Get response messages last_response_messages = self.response_messages @@ -122,6 +122,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3): # Individual task failures print(f"Sleeptime agent processing failed: {e!s}") raise e + return self.run_ids @trace_method async def _issue_background_task( diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index d77238c0..380404c3 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -25,7 +25,8 @@ from letta.errors import ( AgentNotFoundForExportError, PendingApprovalError, ) -from letta.helpers.datetime_helpers import get_utc_timestamp_ns +from letta.groups.sleeptime_multi_agent_v4 import SleeptimeMultiAgentV4 +from letta.helpers.datetime_helpers import get_utc_time, get_utc_timestamp_ns from letta.log import get_logger from letta.orm.errors import NoResultFound from letta.otel.context import get_ctx_attributes @@ -33,11 +34,12 @@ from letta.otel.metric_registry import MetricRegistry from letta.schemas.agent import AgentRelationships, AgentState, CreateAgent, UpdateAgent from letta.schemas.agent_file import AgentFileSchema from letta.schemas.block import BaseBlock, Block, BlockResponse, BlockUpdate -from letta.schemas.enums import AgentType, RunStatus +from letta.schemas.enums import AgentType, MessageRole, RunStatus from letta.schemas.file import AgentFileAttachment, FileMetadataBase, PaginatedAgentFiles from letta.schemas.group import Group from letta.schemas.job import LettaRequestConfig from letta.schemas.letta_message import LettaMessageUnion, LettaMessageUpdateUnion, MessageType +from letta.schemas.letta_message_content import TextContent from letta.schemas.letta_request import LettaAsyncRequest, LettaRequest, LettaStreamingRequest from letta.schemas.letta_response import LettaResponse from letta.schemas.letta_stop_reason import StopReasonType @@ -48,7 +50,7 @@ from letta.schemas.memory import ( CreateArchivalMemory, Memory, ) -from letta.schemas.message import BaseMessage, MessageCreate, MessageCreateType, MessageSearchRequest, MessageSearchResult +from letta.schemas.message import Message, MessageCreate, MessageCreateType, MessageSearchRequest, MessageSearchResult from letta.schemas.passage import Passage from letta.schemas.run import Run as PydanticRun, RunUpdate from letta.schemas.source import BaseSource, Source @@ -1902,3 +1904,63 @@ async def summarize_messages( status_code=status.HTTP_403_FORBIDDEN, detail="Summarization is not currently supported for this agent configuration. Please contact Letta support.", ) + + +class CaptureMessagesRequest(BaseModel): + provider: str + model: str + request_messages: list[dict[str, Any]] + response_dict: dict[str, Any] + + +@router.post("/{agent_id}/messages/capture", response_model=str, operation_id="capture_messages", include_in_schema=False) +async def capture_messages( + agent_id: AgentId, + request: CaptureMessagesRequest = Body(...), + server: "SyncServer" = Depends(get_letta_server), + headers: HeaderParams = Depends(get_headers), +): + """ + Capture a list of messages for an agent. + """ + actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"]) + + messages_to_persist = [] + + # Input user messages + for message in request.request_messages: + if message["role"] == "user": + messages_to_persist.append( + Message( + role=MessageRole.user, + content=[(TextContent(text=message["content"]))], + agent_id=agent_id, + tool_calls=None, + tool_call_id=None, + created_at=get_utc_time(), + ) + ) + + # Assistant response + messages_to_persist.append( + Message( + role=MessageRole.assistant, + content=[(TextContent(text=request.response_dict["content"]))], + agent_id=agent_id, + model=request.model, + tool_calls=None, + tool_call_id=None, + created_at=get_utc_time(), + ) + ) + + response_messages = await server.message_manager.create_many_messages_async(messages_to_persist, actor=actor) + + sleeptime_group = agent.multi_agent_group if agent.multi_agent_group and agent.multi_agent_group.manager_type == "sleeptime" else None + if sleeptime_group: + sleeptime_agent_loop = SleeptimeMultiAgentV4(agent_state=agent, actor=actor, group=sleeptime_group) + sleeptime_agent_loop.response_messages = response_messages + run_ids = await sleeptime_agent_loop.run_sleeptime_agents() + + return JSONResponse({"success": True, "messages_created": len(response_messages), "run_ids": run_ids})