fix(core): handle anyio.BrokenResourceError for client disconnects (#9358)
Catch BrokenResourceError alongside ClosedResourceError in streaming response, logging middleware, and app exception handlers so client disconnects are logged at info level instead of surfacing as 500s. Datadog: https://us5.datadoghq.com/error-tracking/issue/4f57af0c-d558-11f0-a65d-da7ad0900000 🤖 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -11,6 +11,7 @@ from functools import partial
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
import anyio
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
# Enable Python fault handler to get stack traces on segfaults
|
# Enable Python fault handler to get stack traces on segfaults
|
||||||
@@ -415,6 +416,12 @@ def create_application() -> "FastAPI":
|
|||||||
# === Exception Handlers ===
|
# === Exception Handlers ===
|
||||||
# TODO (cliandy): move to separate file
|
# TODO (cliandy): move to separate file
|
||||||
|
|
||||||
|
@app.exception_handler(anyio.BrokenResourceError)
|
||||||
|
@app.exception_handler(anyio.ClosedResourceError)
|
||||||
|
async def client_disconnect_handler(request: Request, exc: Exception):
|
||||||
|
logger.info(f"Client disconnected: {request.method} {request.url.path}")
|
||||||
|
return JSONResponse(status_code=499, content={"detail": "Client disconnected"})
|
||||||
|
|
||||||
@app.exception_handler(Exception)
|
@app.exception_handler(Exception)
|
||||||
async def generic_error_handler(request: Request, exc: Exception):
|
async def generic_error_handler(request: Request, exc: Exception):
|
||||||
# Log with structured context
|
# Log with structured context
|
||||||
|
|||||||
@@ -125,6 +125,12 @@ class LoggingMiddleware(BaseHTTPMiddleware):
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
import anyio
|
||||||
|
|
||||||
|
if isinstance(exc, (anyio.BrokenResourceError, anyio.ClosedResourceError)):
|
||||||
|
logger.info(f"Client disconnected during request: {request.method} {request.url.path}")
|
||||||
|
raise
|
||||||
|
|
||||||
# Extract request context
|
# Extract request context
|
||||||
request_context = {
|
request_context = {
|
||||||
"method": request.method,
|
"method": request.method,
|
||||||
|
|||||||
@@ -228,7 +228,7 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
|||||||
await asyncio.shield(self._protected_stream_response(send))
|
await asyncio.shield(self._protected_stream_response(send))
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info("Stream response was cancelled, but shielded task should continue")
|
logger.info("Stream response was cancelled, but shielded task should continue")
|
||||||
except anyio.ClosedResourceError:
|
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
|
||||||
logger.info("Client disconnected, but shielded task should continue")
|
logger.info("Client disconnected, but shielded task should continue")
|
||||||
self._client_connected = False
|
self._client_connected = False
|
||||||
except PendingApprovalError as e:
|
except PendingApprovalError as e:
|
||||||
@@ -272,7 +272,7 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
|||||||
"more_body": more_body,
|
"more_body": more_body,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
except anyio.ClosedResourceError:
|
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
|
||||||
logger.info("Client disconnected during initial response, continuing processing without sending more chunks")
|
logger.info("Client disconnected during initial response, continuing processing without sending more chunks")
|
||||||
self._client_connected = False
|
self._client_connected = False
|
||||||
|
|
||||||
@@ -302,10 +302,9 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
|||||||
"more_body": more_body,
|
"more_body": more_body,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
except anyio.ClosedResourceError:
|
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
|
||||||
logger.info("Client disconnected, continuing processing without sending more data")
|
logger.info("Client disconnected, continuing processing without sending more data")
|
||||||
self._client_connected = False
|
self._client_connected = False
|
||||||
# Continue processing but don't try to send more data
|
|
||||||
|
|
||||||
# Handle explicit run cancellations (should not throw error)
|
# Handle explicit run cancellations (should not throw error)
|
||||||
except RunCancelledException as exc:
|
except RunCancelledException as exc:
|
||||||
@@ -332,7 +331,7 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
|||||||
"more_body": more_body,
|
"more_body": more_body,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
except anyio.ClosedResourceError:
|
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
|
||||||
self._client_connected = False
|
self._client_connected = False
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -369,7 +368,7 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
|||||||
"more_body": more_body,
|
"more_body": more_body,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
except anyio.ClosedResourceError:
|
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
|
||||||
self._client_connected = False
|
self._client_connected = False
|
||||||
|
|
||||||
capture_sentry_exception(exc)
|
capture_sentry_exception(exc)
|
||||||
@@ -377,5 +376,5 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
|||||||
if more_body and self._client_connected:
|
if more_body and self._client_connected:
|
||||||
try:
|
try:
|
||||||
await send({"type": "http.response.body", "body": b"", "more_body": False})
|
await send({"type": "http.response.body", "body": b"", "more_body": False})
|
||||||
except anyio.ClosedResourceError:
|
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
|
||||||
self._client_connected = False
|
self._client_connected = False
|
||||||
|
|||||||
Reference in New Issue
Block a user