From 3634464251214c9fd02b4f32c97568b5dab5e144 Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Wed, 11 Feb 2026 11:07:09 -0800 Subject: [PATCH] fix(core): handle anyio.BrokenResourceError for client disconnects (#9358) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Catch BrokenResourceError alongside ClosedResourceError in streaming response, logging middleware, and app exception handlers so client disconnects are logged at info level instead of surfacing as 500s. Datadog: https://us5.datadoghq.com/error-tracking/issue/4f57af0c-d558-11f0-a65d-da7ad0900000 🤖 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta --- letta/server/rest_api/app.py | 7 +++++++ letta/server/rest_api/middleware/logging.py | 6 ++++++ letta/server/rest_api/streaming_response.py | 13 ++++++------- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index 4dd03eb4..656ab33c 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -11,6 +11,7 @@ from functools import partial from pathlib import Path from typing import Optional +import anyio import uvicorn # Enable Python fault handler to get stack traces on segfaults @@ -415,6 +416,12 @@ def create_application() -> "FastAPI": # === Exception Handlers === # TODO (cliandy): move to separate file + @app.exception_handler(anyio.BrokenResourceError) + @app.exception_handler(anyio.ClosedResourceError) + async def client_disconnect_handler(request: Request, exc: Exception): + logger.info(f"Client disconnected: {request.method} {request.url.path}") + return JSONResponse(status_code=499, content={"detail": "Client disconnected"}) + @app.exception_handler(Exception) async def generic_error_handler(request: Request, exc: Exception): # Log with structured context diff --git a/letta/server/rest_api/middleware/logging.py b/letta/server/rest_api/middleware/logging.py index 16c33f7c..b85519b7 100644 --- a/letta/server/rest_api/middleware/logging.py +++ b/letta/server/rest_api/middleware/logging.py @@ -125,6 +125,12 @@ class LoggingMiddleware(BaseHTTPMiddleware): return response except Exception as exc: + import anyio + + if isinstance(exc, (anyio.BrokenResourceError, anyio.ClosedResourceError)): + logger.info(f"Client disconnected during request: {request.method} {request.url.path}") + raise + # Extract request context request_context = { "method": request.method, diff --git a/letta/server/rest_api/streaming_response.py b/letta/server/rest_api/streaming_response.py index 02d727ff..db117786 100644 --- a/letta/server/rest_api/streaming_response.py +++ b/letta/server/rest_api/streaming_response.py @@ -228,7 +228,7 @@ class StreamingResponseWithStatusCode(StreamingResponse): await asyncio.shield(self._protected_stream_response(send)) except asyncio.CancelledError: logger.info("Stream response was cancelled, but shielded task should continue") - except anyio.ClosedResourceError: + except (anyio.ClosedResourceError, anyio.BrokenResourceError): logger.info("Client disconnected, but shielded task should continue") self._client_connected = False except PendingApprovalError as e: @@ -272,7 +272,7 @@ class StreamingResponseWithStatusCode(StreamingResponse): "more_body": more_body, } ) - except anyio.ClosedResourceError: + except (anyio.ClosedResourceError, anyio.BrokenResourceError): logger.info("Client disconnected during initial response, continuing processing without sending more chunks") self._client_connected = False @@ -302,10 +302,9 @@ class StreamingResponseWithStatusCode(StreamingResponse): "more_body": more_body, } ) - except anyio.ClosedResourceError: + except (anyio.ClosedResourceError, anyio.BrokenResourceError): logger.info("Client disconnected, continuing processing without sending more data") self._client_connected = False - # Continue processing but don't try to send more data # Handle explicit run cancellations (should not throw error) except RunCancelledException as exc: @@ -332,7 +331,7 @@ class StreamingResponseWithStatusCode(StreamingResponse): "more_body": more_body, } ) - except anyio.ClosedResourceError: + except (anyio.ClosedResourceError, anyio.BrokenResourceError): self._client_connected = False return @@ -369,7 +368,7 @@ class StreamingResponseWithStatusCode(StreamingResponse): "more_body": more_body, } ) - except anyio.ClosedResourceError: + except (anyio.ClosedResourceError, anyio.BrokenResourceError): self._client_connected = False capture_sentry_exception(exc) @@ -377,5 +376,5 @@ class StreamingResponseWithStatusCode(StreamingResponse): if more_body and self._client_connected: try: await send({"type": "http.response.body", "body": b"", "more_body": False}) - except anyio.ClosedResourceError: + except (anyio.ClosedResourceError, anyio.BrokenResourceError): self._client_connected = False