diff --git a/fern/openapi-overrides.yml b/fern/openapi-overrides.yml index cc7bf153..60c5106e 100644 --- a/fern/openapi-overrides.yml +++ b/fern/openapi-overrides.yml @@ -615,6 +615,7 @@ paths: - $ref: '#/components/schemas/SummaryMessage' - $ref: '#/components/schemas/EventMessage' - $ref: '#/components/schemas/LettaPing' + - $ref: '#/components/schemas/LettaErrorMessage' - $ref: '#/components/schemas/LettaStopReason' - $ref: '#/components/schemas/LettaUsageStatistics' /v1/agents/{agent_id}/messages/cancel: diff --git a/fern/openapi.json b/fern/openapi.json index 55c2267e..43705af5 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -14373,6 +14373,9 @@ { "$ref": "#/components/schemas/LettaPing" }, + { + "$ref": "#/components/schemas/LettaErrorMessage" + }, { "$ref": "#/components/schemas/LettaStopReason" }, @@ -29007,6 +29010,41 @@ "required": ["agent_id"], "title": "LettaBatchRequest" }, + "LettaErrorMessage": { + "properties": { + "message_type": { + "type": "string", + "const": "error_message", + "title": "Message Type", + "description": "The type of the message.", + "default": "error_message" + }, + "run_id": { + "type": "string", + "title": "Run ID", + "description": "The ID of the run." + }, + "error_type": { + "type": "string", + "title": "Error Type", + "description": "The type of error." + }, + "message": { + "type": "string", + "title": "Message", + "description": "The error message." + }, + "detail": { + "type": "string", + "title": "Detail", + "description": "An optional error detail." + } + }, + "type": "object", + "required": ["message_type", "run_id", "error_type", "message"], + "title": "LettaErrorMessage", + "description": "Error messages are used to notify the client of an error that occurred during the agent's execution." + }, "LettaImage": { "properties": { "type": { @@ -29480,6 +29518,9 @@ { "$ref": "#/components/schemas/LettaPing" }, + { + "$ref": "#/components/schemas/LettaErrorMessage" + }, { "$ref": "#/components/schemas/LettaStopReason" }, @@ -29495,6 +29536,7 @@ "approval_request_message": "#/components/schemas/ApprovalRequestMessage", "approval_response_message": "#/components/schemas/ApprovalResponseMessage", "assistant_message": "#/components/schemas/AssistantMessage", + "error_message": "#/components/schemas/LettaErrorMessage", "hidden_reasoning_message": "#/components/schemas/HiddenReasoningMessage", "ping": "#/components/schemas/LettaPing", "reasoning_message": "#/components/schemas/ReasoningMessage", diff --git a/letta/__init__.py b/letta/__init__.py index 238d6c20..3469c726 100644 --- a/letta/__init__.py +++ b/letta/__init__.py @@ -28,7 +28,7 @@ from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import JobStatus from letta.schemas.file import FileMetadata from letta.schemas.job import Job -from letta.schemas.letta_message import LettaMessage, LettaPing +from letta.schemas.letta_message import LettaErrorMessage, LettaMessage, LettaPing from letta.schemas.letta_stop_reason import LettaStopReason from letta.schemas.llm_config import LLMConfig from letta.schemas.memory import ArchivalMemorySummary, BasicBlockMemory, ChatMemory, Memory, RecallMemorySummary diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index 8b224924..b0fc3247 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -28,7 +28,7 @@ from letta.local_llm.constants import INNER_THOUGHTS_KWARG from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.enums import MessageRole -from letta.schemas.letta_message import ApprovalReturn, LettaMessage, MessageType +from letta.schemas.letta_message import ApprovalReturn, LettaErrorMessage, LettaMessage, MessageType from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent from letta.schemas.letta_response import LettaResponse from letta.schemas.letta_stop_reason import LettaStopReason, StopReasonType @@ -319,14 +319,13 @@ class LettaAgentV3(LettaAgentV2): yield f"data: {self.stop_reason.model_dump_json()}\n\n" # Mid-stream error: yield error event to client in SSE format - error_chunk = { - "error": { - "type": "internal_error", - "message": "An error occurred during agent execution.", - "detail": str(e), - } - } - yield f"event: error\ndata: {json.dumps(error_chunk)}\n\n" + error_message = LettaErrorMessage( + run_id=run_id, + error_type="internal_error", + message="An error occurred during agent execution.", + detail=str(e), + ) + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Return immediately - don't fall through to finish chunks # This prevents sending end_turn finish chunks after an error @@ -360,15 +359,16 @@ class LettaAgentV3(LettaAgentV2): if self.stop_reason is None: self.stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value) + yield f"data: {self.stop_reason.model_dump_json()}\n\n" + # Send error event - error_chunk = { - "error": { - "type": "cleanup_error", - "message": "An error occurred during stream finalization.", - "detail": str(cleanup_error), - } - } - yield f"event: error\ndata: {json.dumps(error_chunk)}\n\n" + error_message = LettaErrorMessage( + run_id=run_id, + error_type="cleanup_error", + message="An error occurred during stream finalization.", + detail=str(cleanup_error), + ) + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Note: we don't send finish chunks here since we already errored @trace_method diff --git a/letta/schemas/letta_message.py b/letta/schemas/letta_message.py index 023d2a91..5f69d774 100644 --- a/letta/schemas/letta_message.py +++ b/letta/schemas/letta_message.py @@ -368,6 +368,24 @@ class LettaPing(LettaMessage): ) +class LettaErrorMessage(BaseModel): + """ + Message returning any error that occurred during the agent's execution, mid SSE stream. + + Args: + run_id (str): The ID of the run + error_type (str): The type of error + message (str): The error message + detail (Optional[str]): An optional error detail + """ + + message_type: Literal["error_message"] = "error_message" + run_id: str + error_type: str + message: str + detail: Optional[str] = None + + class SummaryMessage(LettaMessage): """ A message representing a summary of the conversation. Sent to the LLM as a user or system message depending on the provider. @@ -458,6 +476,44 @@ def create_letta_ping_schema(): } +def create_letta_error_message_schema(): + return { + "properties": { + "message_type": { + "type": "string", + "const": "error_message", + "title": "Message Type", + "description": "The type of the message.", + "default": "error_message", + }, + "run_id": { + "type": "string", + "title": "Run ID", + "description": "The ID of the run.", + }, + "error_type": { + "type": "string", + "title": "Error Type", + "description": "The type of error.", + }, + "message": { + "type": "string", + "title": "Message", + "description": "The error message.", + }, + "detail": { + "type": "string", + "title": "Detail", + "description": "An optional error detail.", + }, + }, + "type": "object", + "required": ["message_type", "run_id", "error_type", "message"], + "title": "LettaErrorMessage", + "description": "Error messages are used to notify the client of an error that occurred during the agent's execution.", + } + + # -------------------------- # Message Update API Schemas # -------------------------- diff --git a/letta/schemas/letta_response.py b/letta/schemas/letta_response.py index 1a37fba4..68ac2dc3 100644 --- a/letta/schemas/letta_response.py +++ b/letta/schemas/letta_response.py @@ -13,6 +13,7 @@ from letta.schemas.letta_message import ( ApprovalResponseMessage, AssistantMessage, HiddenReasoningMessage, + LettaErrorMessage, LettaMessage, LettaMessageUnion, LettaPing, @@ -201,6 +202,7 @@ class LettaStreamingResponse(RootModel): ApprovalRequestMessage, ApprovalResponseMessage, LettaPing, + LettaErrorMessage, LettaStopReason, LettaUsageStatistics, ] = Field(..., discriminator="message_type") diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index 84dda5d5..21bb9e0c 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -63,7 +63,7 @@ from letta.jobs.scheduler import start_scheduler_with_leader_election from letta.log import get_logger from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError from letta.otel.tracing import get_trace_id -from letta.schemas.letta_message import create_letta_message_union_schema, create_letta_ping_schema +from letta.schemas.letta_message import create_letta_error_message_schema, create_letta_message_union_schema, create_letta_ping_schema from letta.schemas.letta_message_content import ( create_letta_assistant_message_content_union_schema, create_letta_message_content_union_schema, @@ -110,6 +110,7 @@ def generate_openapi_schema(app: FastAPI): letta_docs["components"]["schemas"]["LettaAssistantMessageContentUnion"] = create_letta_assistant_message_content_union_schema() letta_docs["components"]["schemas"]["LettaUserMessageContentUnion"] = create_letta_user_message_content_union_schema() letta_docs["components"]["schemas"]["LettaPing"] = create_letta_ping_schema() + letta_docs["components"]["schemas"]["LettaErrorMessage"] = create_letta_error_message_schema() # Update the app's schema with our modified version app.openapi_schema = letta_docs diff --git a/letta/server/rest_api/redis_stream_manager.py b/letta/server/rest_api/redis_stream_manager.py index 5728e19a..2e7625e1 100644 --- a/letta/server/rest_api/redis_stream_manager.py +++ b/letta/server/rest_api/redis_stream_manager.py @@ -9,7 +9,8 @@ from typing import AsyncIterator, Dict, List, Optional from letta.data_sources.redis_client import AsyncRedisClient from letta.log import get_logger from letta.schemas.enums import RunStatus -from letta.schemas.letta_stop_reason import StopReasonType +from letta.schemas.letta_message import LettaErrorMessage +from letta.schemas.letta_stop_reason import LettaStopReason, StopReasonType from letta.schemas.run import RunUpdate from letta.schemas.user import User from letta.server.rest_api.streaming_response import RunCancelledException @@ -266,8 +267,14 @@ async def create_background_stream_processor( saw_done = True else: # No stop_reason and no terminal - this is an error condition - error_chunk = {"error": "Stream ended unexpectedly without stop_reason", "code": "STREAM_INCOMPLETE"} - await writer.write_chunk(run_id=run_id, data=f"event: error\ndata: {json.dumps(error_chunk)}\n\n", is_complete=False) + error_message = LettaErrorMessage( + run_id=run_id, + error_type="stream_incomplete", + message="Stream ended unexpectedly without stop_reason.", + detail=None, + ) + yield f"data: {LettaStopReason(stop_reason=StopReasonType.error).model_dump_json()}\n\n" + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" await writer.write_chunk(run_id=run_id, data="data: [DONE]\n\n", is_complete=True) saw_error = True saw_done = True @@ -284,8 +291,17 @@ async def create_background_stream_processor( except Exception as e: logger.error(f"Error processing stream for run {run_id}: {e}") # Write error chunk - error_chunk = {"error": str(e), "code": "INTERNAL_SERVER_ERROR"} - await writer.write_chunk(run_id=run_id, data=f"event: error\ndata: {json.dumps(error_chunk)}\n\n", is_complete=False) + stop_reason = StopReasonType.error.value + error_message = LettaErrorMessage( + run_id=run_id, + error_type="internal_error", + message="An unknown error occurred with the LLM streaming request.", + detail=str(e), + ) + await writer.write_chunk( + run_id=run_id, data=f"data: {LettaStopReason(stop_reason=stop_reason).model_dump_json()}\n\n", is_complete=False + ) + await writer.write_chunk(run_id=run_id, data=f"event: error\ndata: {error_message.model_dump_json()}\n\n", is_complete=False) await writer.write_chunk(run_id=run_id, data="data: [DONE]\n\n", is_complete=True) saw_error = True saw_done = True diff --git a/letta/server/rest_api/routers/v1/runs.py b/letta/server/rest_api/routers/v1/runs.py index 0a3063a0..35669d85 100644 --- a/letta/server/rest_api/routers/v1/runs.py +++ b/letta/server/rest_api/routers/v1/runs.py @@ -282,6 +282,7 @@ async def delete_run( {"$ref": "#/components/schemas/ApprovalRequestMessage"}, {"$ref": "#/components/schemas/ApprovalResponseMessage"}, {"$ref": "#/components/schemas/LettaPing"}, + {"$ref": "#/components/schemas/LettaErrorMessage"}, {"$ref": "#/components/schemas/LettaStopReason"}, {"$ref": "#/components/schemas/LettaUsageStatistics"}, ] diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index dd9240d6..c1cbc430 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -27,7 +27,7 @@ from letta.otel.metric_registry import MetricRegistry from letta.schemas.agent import AgentState from letta.schemas.enums import AgentType, MessageStreamStatus, RunStatus from letta.schemas.job import LettaRequestConfig -from letta.schemas.letta_message import AssistantMessage, MessageType +from letta.schemas.letta_message import AssistantMessage, LettaErrorMessage, MessageType from letta.schemas.letta_message_content import TextContent from letta.schemas.letta_request import LettaStreamingRequest from letta.schemas.letta_response import LettaResponse @@ -331,19 +331,20 @@ class StreamingService: f"Stream for run {run_id} ended without terminal event. " f"Agent stop_reason: {agent_loop.stop_reason}. Emitting error + [DONE]." ) - error_chunk = { - "error": { - "type": "stream_incomplete", - "message": "Stream ended unexpectedly without a terminal event.", - "detail": None, - } - } - yield f"event: error\ndata: {json.dumps(error_chunk)}\n\n" + stop_reason = LettaStopReason(stop_reason=StopReasonType.error) + error_message = LettaErrorMessage( + run_id=run_id, + error_type="stream_incomplete", + message="Stream ended unexpectedly without a terminal event.", + detail=None, + ) + yield f"data: {stop_reason.model_dump_json()}\n\n" + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" yield "data: [DONE]\n\n" saw_error = True saw_done = True run_status = RunStatus.failed - stop_reason = StopReasonType.error + else: # set run status after successful completion if agent_loop.stop_reason and agent_loop.stop_reason.stop_reason.value == "cancelled": @@ -354,60 +355,72 @@ class StreamingService: except LLMTimeoutError as e: run_status = RunStatus.failed - error_data = {"error": {"type": "llm_timeout", "message": "The LLM request timed out. Please try again.", "detail": str(e)}} - stop_reason = StopReasonType.llm_api_error - logger.error(f"Run {run_id} stopped with LLM timeout error: {e}, error_data: {error_data}") - yield f"event: error\ndata: {json.dumps(error_data)}\n\n" + stop_reason = LettaStopReason(stop_reason=StopReasonType.llm_api_error) + error_message = LettaErrorMessage( + run_id=run_id, + error_type="llm_timeout", + message="The LLM request timed out. Please try again.", + detail=str(e), + ) + logger.error(f"Run {run_id} stopped with LLM timeout error: {e}, error_data: {error_message.model_dump()}") + yield f"data: {stop_reason.model_dump_json()}\n\n" + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except LLMRateLimitError as e: run_status = RunStatus.failed - error_data = { - "error": { - "type": "llm_rate_limit", - "message": "Rate limit exceeded for LLM model provider. Please wait before making another request.", - "detail": str(e), - } - } - stop_reason = StopReasonType.llm_api_error - logger.warning(f"Run {run_id} stopped with LLM rate limit error: {e}, error_data: {error_data}") - yield f"event: error\ndata: {json.dumps(error_data)}\n\n" + stop_reason = LettaStopReason(stop_reason=StopReasonType.llm_api_error) + error_message = LettaErrorMessage( + run_id=run_id, + error_type="llm_rate_limit", + message="Rate limit exceeded for LLM model provider. Please wait before making another request.", + detail=str(e), + ) + logger.warning(f"Run {run_id} stopped with LLM rate limit error: {e}, error_data: {error_message.model_dump()}") + yield f"data: {stop_reason.model_dump_json()}\n\n" + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except LLMAuthenticationError as e: run_status = RunStatus.failed - error_data = { - "error": { - "type": "llm_authentication", - "message": "Authentication failed with the LLM model provider.", - "detail": str(e), - } - } logger.warning(f"Run {run_id} stopped with LLM authentication error: {e}, error_data: {error_data}") - stop_reason = StopReasonType.llm_api_error - yield f"event: error\ndata: {json.dumps(error_data)}\n\n" + stop_reason = LettaStopReason(stop_reason=StopReasonType.llm_api_error) + error_message = LettaErrorMessage( + run_id=run_id, + error_type="llm_authentication", + message="Authentication failed with the LLM model provider.", + detail=str(e), + ) + yield f"data: {stop_reason.model_dump_json()}\n\n" + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except LLMError as e: run_status = RunStatus.failed - error_data = {"error": {"type": "llm_error", "message": "An error occurred with the LLM request.", "detail": str(e)}} logger.error(f"Run {run_id} stopped with LLM error: {e}, error_data: {error_data}") - stop_reason = StopReasonType.llm_api_error - yield f"event: error\ndata: {json.dumps(error_data)}\n\n" + stop_reason = LettaStopReason(stop_reason=StopReasonType.llm_api_error) + error_message = LettaErrorMessage( + run_id=run_id, + error_type="llm_error", + message="An error occurred with the LLM request.", + detail=str(e), + ) + yield f"data: {stop_reason.model_dump_json()}\n\n" + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except Exception as e: run_status = RunStatus.failed - error_data = { - "error": { - "type": "internal_error", - "message": "An unknown error occurred with the LLM streaming request.", - "detail": str(e), - } - } logger.error(f"Run {run_id} stopped with unknown error: {e}, error_data: {error_data}") - stop_reason = StopReasonType.error - yield f"event: error\ndata: {json.dumps(error_data)}\n\n" + stop_reason = LettaStopReason(stop_reason=StopReasonType.error) + error_message = LettaErrorMessage( + run_id=run_id, + error_type="internal_error", + message="An unknown error occurred with the LLM streaming request.", + detail=str(e), + ) + yield f"data: {stop_reason.model_dump_json()}\n\n" + yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" # Capture for Sentry but don't re-raise to allow stream to complete gracefully diff --git a/tests/integration_test_human_in_the_loop.py b/tests/integration_test_human_in_the_loop.py index 2e6abc03..56785a9e 100644 --- a/tests/integration_test_human_in_the_loop.py +++ b/tests/integration_test_human_in_the_loop.py @@ -608,6 +608,9 @@ def test_approve_and_follow_up_with_error( messages = accumulate_chunks(response) assert messages is not None + print("\n\nmessages:\n\n") + for m in messages: + print(m) stop_reason_message = [m for m in messages if m.message_type == "stop_reason"][0] assert stop_reason_message assert stop_reason_message.stop_reason == "invalid_llm_response"