feat: raise early exceptions in streaming endpoint (#2858)
This commit is contained in:
@@ -306,7 +306,7 @@ class LettaAgentV2(BaseAgentV2):
|
||||
)
|
||||
|
||||
except:
|
||||
if self.stop_reason:
|
||||
if self.stop_reason and not first_chunk:
|
||||
yield f"data: {self.stop_reason.model_dump_json()}\n\n"
|
||||
raise
|
||||
|
||||
|
||||
@@ -17,7 +17,15 @@ from starlette.middleware.cors import CORSMiddleware
|
||||
from letta.__init__ import __version__ as letta_version
|
||||
from letta.agents.exceptions import IncompatibleAgentType
|
||||
from letta.constants import ADMIN_PREFIX, API_PREFIX, OPENAI_API_PREFIX
|
||||
from letta.errors import BedrockPermissionError, LettaAgentNotFoundError, LettaUserNotFoundError
|
||||
from letta.errors import (
|
||||
BedrockPermissionError,
|
||||
LettaAgentNotFoundError,
|
||||
LettaUserNotFoundError,
|
||||
LLMAuthenticationError,
|
||||
LLMError,
|
||||
LLMRateLimitError,
|
||||
LLMTimeoutError,
|
||||
)
|
||||
from letta.helpers.pinecone_utils import get_pinecone_indices, should_use_pinecone, upsert_pinecone_indices
|
||||
from letta.jobs.scheduler import start_scheduler_with_leader_election
|
||||
from letta.log import get_logger
|
||||
@@ -276,6 +284,58 @@ def create_application() -> "FastAPI":
|
||||
},
|
||||
)
|
||||
|
||||
@app.exception_handler(LLMTimeoutError)
|
||||
async def llm_timeout_error_handler(request: Request, exc: LLMTimeoutError):
|
||||
return JSONResponse(
|
||||
status_code=504,
|
||||
content={
|
||||
"error": {
|
||||
"type": "llm_timeout",
|
||||
"message": "The LLM request timed out. Please try again.",
|
||||
"detail": str(exc),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
@app.exception_handler(LLMRateLimitError)
|
||||
async def llm_rate_limit_error_handler(request: Request, exc: LLMRateLimitError):
|
||||
return JSONResponse(
|
||||
status_code=429,
|
||||
content={
|
||||
"error": {
|
||||
"type": "llm_rate_limit",
|
||||
"message": "Rate limit exceeded for LLM model provider. Please wait before making another request.",
|
||||
"detail": str(exc),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
@app.exception_handler(LLMAuthenticationError)
|
||||
async def llm_auth_error_handler(request: Request, exc: LLMAuthenticationError):
|
||||
return JSONResponse(
|
||||
status_code=401,
|
||||
content={
|
||||
"error": {
|
||||
"type": "llm_authentication",
|
||||
"message": "Authentication failed with the LLM model provider.",
|
||||
"detail": str(exc),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
@app.exception_handler(LLMError)
|
||||
async def llm_error_handler(request: Request, exc: LLMError):
|
||||
return JSONResponse(
|
||||
status_code=502,
|
||||
content={
|
||||
"error": {
|
||||
"type": "llm_error",
|
||||
"message": "An error occurred with the LLM request.",
|
||||
"detail": str(exc),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
settings.cors_origins.append("https://app.letta.com")
|
||||
|
||||
if (os.getenv("LETTA_SERVER_SECURE") == "true") or "--secure" in sys.argv:
|
||||
|
||||
@@ -1316,15 +1316,52 @@ async def send_message_streaming(
|
||||
try:
|
||||
if agent_eligible and model_compatible:
|
||||
agent_loop = AgentLoop.load(agent_state=agent, actor=actor)
|
||||
raw_stream = agent_loop.stream(
|
||||
input_messages=request.messages,
|
||||
max_steps=request.max_steps,
|
||||
stream_tokens=request.stream_tokens and model_compatible_token_streaming,
|
||||
run_id=run.id if run else None,
|
||||
use_assistant_message=request.use_assistant_message,
|
||||
request_start_timestamp_ns=request_start_timestamp_ns,
|
||||
include_return_message_types=request.include_return_message_types,
|
||||
)
|
||||
|
||||
async def error_aware_stream():
|
||||
"""Stream that handles early LLM errors gracefully in streaming format."""
|
||||
from letta.errors import LLMAuthenticationError, LLMError, LLMRateLimitError, LLMTimeoutError
|
||||
|
||||
try:
|
||||
stream = agent_loop.stream(
|
||||
input_messages=request.messages,
|
||||
max_steps=request.max_steps,
|
||||
stream_tokens=request.stream_tokens and model_compatible_token_streaming,
|
||||
run_id=run.id if run else None,
|
||||
use_assistant_message=request.use_assistant_message,
|
||||
request_start_timestamp_ns=request_start_timestamp_ns,
|
||||
include_return_message_types=request.include_return_message_types,
|
||||
)
|
||||
async for chunk in stream:
|
||||
yield chunk
|
||||
|
||||
except LLMTimeoutError as e:
|
||||
error_data = {
|
||||
"error": {"type": "llm_timeout", "message": "The LLM request timed out. Please try again.", "detail": str(e)}
|
||||
}
|
||||
yield (f"data: {json.dumps(error_data)}\n\n", 504)
|
||||
except LLMRateLimitError as e:
|
||||
error_data = {
|
||||
"error": {
|
||||
"type": "llm_rate_limit",
|
||||
"message": "Rate limit exceeded for LLM model provider. Please wait before making another request.",
|
||||
"detail": str(e),
|
||||
}
|
||||
}
|
||||
yield (f"data: {json.dumps(error_data)}\n\n", 429)
|
||||
except LLMAuthenticationError as e:
|
||||
error_data = {
|
||||
"error": {
|
||||
"type": "llm_authentication",
|
||||
"message": "Authentication failed with the LLM model provider.",
|
||||
"detail": str(e),
|
||||
}
|
||||
}
|
||||
yield (f"data: {json.dumps(error_data)}\n\n", 401)
|
||||
except LLMError as e:
|
||||
error_data = {"error": {"type": "llm_error", "message": "An error occurred with the LLM request.", "detail": str(e)}}
|
||||
yield (f"data: {json.dumps(error_data)}\n\n", 502)
|
||||
|
||||
raw_stream = error_aware_stream()
|
||||
|
||||
from letta.server.rest_api.streaming_response import StreamingResponseWithStatusCode, add_keepalive_to_stream
|
||||
|
||||
|
||||
Reference in New Issue
Block a user