feat: catch asyncio cancellations from app in stream response (#4112)
This commit is contained in:
@@ -76,6 +76,10 @@ class LettaUserNotFoundError(LettaError):
|
||||
"""Error raised when a user is not found."""
|
||||
|
||||
|
||||
class LettaUnexpectedStreamCancellationError(LettaError):
|
||||
"""Error raised when a streaming request is terminated unexpectedly."""
|
||||
|
||||
|
||||
class LLMError(LettaError):
|
||||
pass
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import anyio
|
||||
from fastapi.responses import StreamingResponse
|
||||
from starlette.types import Send
|
||||
|
||||
from letta.errors import LettaUnexpectedStreamCancellationError
|
||||
from letta.log import get_logger
|
||||
from letta.schemas.enums import JobStatus
|
||||
from letta.schemas.letta_ping import LettaPing
|
||||
@@ -288,33 +289,11 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
||||
|
||||
# Handle client timeouts (should throw error to inform user)
|
||||
except asyncio.CancelledError as exc:
|
||||
logger.warning("Stream was cancelled due to client timeout or unexpected disconnection")
|
||||
logger.warning("Stream was terminated due to unexpected cancellation from server")
|
||||
# Handle unexpected cancellation with error
|
||||
more_body = False
|
||||
error_resp = {"error": {"message": "Request was unexpectedly cancelled (likely due to client timeout or disconnection)"}}
|
||||
error_event = f"event: error\ndata: {json.dumps(error_resp)}\n\n".encode(self.charset)
|
||||
if not self.response_started:
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.start",
|
||||
"status": 408, # Request Timeout
|
||||
"headers": self.raw_headers,
|
||||
}
|
||||
)
|
||||
raise
|
||||
if self._client_connected:
|
||||
try:
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.body",
|
||||
"body": error_event,
|
||||
"more_body": more_body,
|
||||
}
|
||||
)
|
||||
except anyio.ClosedResourceError:
|
||||
self._client_connected = False
|
||||
capture_sentry_exception(exc)
|
||||
return
|
||||
raise LettaUnexpectedStreamCancellationError("Stream was terminated due to unexpected cancellation from server")
|
||||
|
||||
except Exception as exc:
|
||||
logger.exception("Unhandled Streaming Error")
|
||||
|
||||
Reference in New Issue
Block a user