From 0d195bd2b7f27b16f654305273b1cbf3fe11b9d2 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Sat, 6 Sep 2025 11:45:46 -0700 Subject: [PATCH] fix(core): patch the error throwing for HITL [LET-4218] (#4455) fix: patch the error throwing for HITL --- letta/agents/helpers.py | 6 ++--- letta/errors.py | 12 +++++++++ letta/server/rest_api/routers/v1/agents.py | 29 ++++++++++++++++++++- letta/server/rest_api/streaming_response.py | 10 ++++++- 4 files changed, 51 insertions(+), 6 deletions(-) diff --git a/letta/agents/helpers.py b/letta/agents/helpers.py index d81fe6d0..d828adff 100644 --- a/letta/agents/helpers.py +++ b/letta/agents/helpers.py @@ -3,6 +3,7 @@ import uuid import xml.etree.ElementTree as ET from typing import List, Optional, Tuple +from letta.errors import PendingApprovalError from letta.helpers import ToolRulesSolver from letta.log import get_logger from letta.schemas.agent import AgentState @@ -168,10 +169,7 @@ async def _prepare_in_context_messages_no_persist_async( else: # User is trying to send a regular message if current_in_context_messages[-1].role == "approval": - raise ValueError( - "Cannot send a new message: The agent is waiting for approval on a tool call. " - "Please approve or deny the pending request before continuing." - ) + raise PendingApprovalError(pending_request_id=current_in_context_messages[-1].id) # Create a new user message from the input but dont store it yet new_in_context_messages = create_input_messages( diff --git a/letta/errors.py b/letta/errors.py index f3188a96..1d154d31 100644 --- a/letta/errors.py +++ b/letta/errors.py @@ -18,6 +18,7 @@ class ErrorCode(Enum): CONTEXT_WINDOW_EXCEEDED = "CONTEXT_WINDOW_EXCEEDED" RATE_LIMIT_EXCEEDED = "RATE_LIMIT_EXCEEDED" TIMEOUT = "TIMEOUT" + CONFLICT = "CONFLICT" class LettaError(Exception): @@ -40,6 +41,17 @@ class LettaError(Exception): return f"{self.__class__.__name__}(message='{self.message}', code='{self.code}', details={self.details})" +class PendingApprovalError(LettaError): + """Error raised when attempting an operation while agent is waiting for tool approval.""" + + def __init__(self, pending_request_id: Optional[str] = None): + self.pending_request_id = pending_request_id + message = "Cannot send a new message: The agent is waiting for approval on a tool call. Please approve or deny the pending request before continuing." + code = ErrorCode.CONFLICT + details = {"error_code": "PENDING_APPROVAL", "pending_request_id": pending_request_id} + super().__init__(message=message, code=code, details=details) + + class LettaToolCreateError(LettaError): """Error raised when a tool cannot be created.""" diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index b715704c..6af06037 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -15,7 +15,13 @@ from starlette.responses import Response, StreamingResponse from letta.agents.letta_agent import LettaAgent from letta.constants import AGENT_ID_PATTERN, DEFAULT_MAX_STEPS, DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG, REDIS_RUN_ID_PREFIX from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client -from letta.errors import AgentExportIdMappingError, AgentExportProcessingError, AgentFileImportError, AgentNotFoundForExportError +from letta.errors import ( + AgentExportIdMappingError, + AgentExportProcessingError, + AgentFileImportError, + AgentNotFoundForExportError, + PendingApprovalError, +) from letta.groups.sleeptime_multi_agent_v2 import SleeptimeMultiAgentV2 from letta.helpers.datetime_helpers import get_utc_timestamp_ns from letta.log import get_logger @@ -1239,6 +1245,12 @@ async def send_message( ) job_status = result.stop_reason.stop_reason.run_status return result + except PendingApprovalError as e: + job_update_metadata = {"error": str(e)} + job_status = JobStatus.failed + raise HTTPException( + status_code=409, detail={"code": "PENDING_APPROVAL", "message": str(e), "pending_request_id": e.pending_request_id} + ) except Exception as e: job_update_metadata = {"error": str(e)} job_status = JobStatus.failed @@ -1437,6 +1449,13 @@ async def send_message_streaming( if settings.track_agent_run: job_status = JobStatus.running return result + except PendingApprovalError as e: + if settings.track_agent_run: + job_update_metadata = {"error": str(e)} + job_status = JobStatus.failed + raise HTTPException( + status_code=409, detail={"code": "PENDING_APPROVAL", "message": str(e), "pending_request_id": e.pending_request_id} + ) except Exception as e: if settings.track_agent_run: job_update_metadata = {"error": str(e)} @@ -1625,6 +1644,14 @@ async def _process_message_background( ) await server.job_manager.update_job_by_id_async(job_id=run_id, job_update=job_update, actor=actor) + except PendingApprovalError as e: + # Update job status to failed with specific error info + job_update = JobUpdate( + status=JobStatus.failed, + completed_at=datetime.now(timezone.utc), + metadata={"error": str(e), "error_code": "PENDING_APPROVAL", "pending_request_id": e.pending_request_id}, + ) + await server.job_manager.update_job_by_id_async(job_id=run_id, job_update=job_update, actor=actor) except Exception as e: # Update job status to failed job_update = JobUpdate( diff --git a/letta/server/rest_api/streaming_response.py b/letta/server/rest_api/streaming_response.py index 9d1b01c9..8b11ab33 100644 --- a/letta/server/rest_api/streaming_response.py +++ b/letta/server/rest_api/streaming_response.py @@ -7,10 +7,11 @@ import json from collections.abc import AsyncIterator import anyio +from fastapi import HTTPException from fastapi.responses import StreamingResponse from starlette.types import Send -from letta.errors import LettaUnexpectedStreamCancellationError +from letta.errors import LettaUnexpectedStreamCancellationError, PendingApprovalError from letta.log import get_logger from letta.schemas.enums import JobStatus from letta.schemas.letta_ping import LettaPing @@ -189,6 +190,13 @@ class StreamingResponseWithStatusCode(StreamingResponse): except anyio.ClosedResourceError: logger.info("Client disconnected, but shielded task should continue") self._client_connected = False + except PendingApprovalError as e: + # This is an expected error, don't log as error + logger.info(f"Pending approval conflict in stream response: {e}") + # Re-raise as HTTPException for proper client handling + raise HTTPException( + status_code=409, detail={"code": "PENDING_APPROVAL", "message": str(e), "pending_request_id": e.pending_request_id} + ) except Exception as e: logger.error(f"Error in protected stream response: {e}") raise