feat: catch closed resource error in stream processing (#4003)
This commit is contained in:
@@ -6,6 +6,7 @@ import asyncio
|
||||
import json
|
||||
from collections.abc import AsyncIterator
|
||||
|
||||
import anyio
|
||||
from fastapi.responses import StreamingResponse
|
||||
from starlette.types import Send
|
||||
|
||||
@@ -176,6 +177,7 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
||||
|
||||
body_iterator: AsyncIterator[str | bytes]
|
||||
response_started: bool = False
|
||||
_client_connected: bool = True
|
||||
|
||||
async def stream_response(self, send: Send) -> None:
|
||||
if settings.use_asyncio_shield:
|
||||
@@ -183,6 +185,9 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
||||
await asyncio.shield(self._protected_stream_response(send))
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Stream response was cancelled, but shielded task should continue")
|
||||
except anyio.ClosedResourceError:
|
||||
logger.info("Client disconnected, but shielded task should continue")
|
||||
self._client_connected = False
|
||||
except Exception as e:
|
||||
logger.error(f"Error in protected stream response: {e}")
|
||||
raise
|
||||
@@ -201,21 +206,25 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
||||
if isinstance(first_chunk_content, str):
|
||||
first_chunk_content = first_chunk_content.encode(self.charset)
|
||||
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.start",
|
||||
"status": self.status_code,
|
||||
"headers": self.raw_headers,
|
||||
}
|
||||
)
|
||||
self.response_started = True
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.body",
|
||||
"body": first_chunk_content,
|
||||
"more_body": more_body,
|
||||
}
|
||||
)
|
||||
try:
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.start",
|
||||
"status": self.status_code,
|
||||
"headers": self.raw_headers,
|
||||
}
|
||||
)
|
||||
self.response_started = True
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.body",
|
||||
"body": first_chunk_content,
|
||||
"more_body": more_body,
|
||||
}
|
||||
)
|
||||
except anyio.ClosedResourceError:
|
||||
logger.info("Client disconnected during initial response, continuing processing without sending more chunks")
|
||||
self._client_connected = False
|
||||
|
||||
async for chunk in self.body_iterator:
|
||||
if isinstance(chunk, tuple):
|
||||
@@ -232,13 +241,21 @@ class StreamingResponseWithStatusCode(StreamingResponse):
|
||||
if isinstance(content, str):
|
||||
content = content.encode(self.charset)
|
||||
more_body = True
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.body",
|
||||
"body": content,
|
||||
"more_body": more_body,
|
||||
}
|
||||
)
|
||||
|
||||
# Only attempt to send if client is still connected
|
||||
if self._client_connected:
|
||||
try:
|
||||
await send(
|
||||
{
|
||||
"type": "http.response.body",
|
||||
"body": content,
|
||||
"more_body": more_body,
|
||||
}
|
||||
)
|
||||
except anyio.ClosedResourceError:
|
||||
logger.info("Client disconnected, continuing processing without sending more data")
|
||||
self._client_connected = False
|
||||
# Continue processing but don't try to send more data
|
||||
|
||||
# Handle explicit job cancellations (should not throw error)
|
||||
except JobCancelledException as exc:
|
||||
|
||||
Reference in New Issue
Block a user