feat: catch asyncio cancellations from app in stream response (#4112)

This commit is contained in:
cthomas
2025-08-22 15:42:14 -07:00
committed by GitHub
parent 5d4509590e
commit b230b5faf5
2 changed files with 7 additions and 24 deletions

View File

@@ -76,6 +76,10 @@ class LettaUserNotFoundError(LettaError):
"""Error raised when a user is not found."""
class LettaUnexpectedStreamCancellationError(LettaError):
"""Error raised when a streaming request is terminated unexpectedly."""
class LLMError(LettaError):
pass

View File

@@ -10,6 +10,7 @@ import anyio
from fastapi.responses import StreamingResponse
from starlette.types import Send
from letta.errors import LettaUnexpectedStreamCancellationError
from letta.log import get_logger
from letta.schemas.enums import JobStatus
from letta.schemas.letta_ping import LettaPing
@@ -288,33 +289,11 @@ class StreamingResponseWithStatusCode(StreamingResponse):
# Handle client timeouts (should throw error to inform user)
except asyncio.CancelledError as exc:
logger.warning("Stream was cancelled due to client timeout or unexpected disconnection")
logger.warning("Stream was terminated due to unexpected cancellation from server")
# Handle unexpected cancellation with error
more_body = False
error_resp = {"error": {"message": "Request was unexpectedly cancelled (likely due to client timeout or disconnection)"}}
error_event = f"event: error\ndata: {json.dumps(error_resp)}\n\n".encode(self.charset)
if not self.response_started:
await send(
{
"type": "http.response.start",
"status": 408, # Request Timeout
"headers": self.raw_headers,
}
)
raise
if self._client_connected:
try:
await send(
{
"type": "http.response.body",
"body": error_event,
"more_body": more_body,
}
)
except anyio.ClosedResourceError:
self._client_connected = False
capture_sentry_exception(exc)
return
raise LettaUnexpectedStreamCancellationError("Stream was terminated due to unexpected cancellation from server")
except Exception as exc:
logger.exception("Unhandled Streaming Error")