diff --git a/letta/server/rest_api/streaming_response.py b/letta/server/rest_api/streaming_response.py index 87c8d60a..4cfe9819 100644 --- a/letta/server/rest_api/streaming_response.py +++ b/letta/server/rest_api/streaming_response.py @@ -6,6 +6,7 @@ import asyncio import json from collections.abc import AsyncIterator +import anyio from fastapi.responses import StreamingResponse from starlette.types import Send @@ -176,6 +177,7 @@ class StreamingResponseWithStatusCode(StreamingResponse): body_iterator: AsyncIterator[str | bytes] response_started: bool = False + _client_connected: bool = True async def stream_response(self, send: Send) -> None: if settings.use_asyncio_shield: @@ -183,6 +185,9 @@ class StreamingResponseWithStatusCode(StreamingResponse): await asyncio.shield(self._protected_stream_response(send)) except asyncio.CancelledError: logger.info(f"Stream response was cancelled, but shielded task should continue") + except anyio.ClosedResourceError: + logger.info("Client disconnected, but shielded task should continue") + self._client_connected = False except Exception as e: logger.error(f"Error in protected stream response: {e}") raise @@ -201,21 +206,25 @@ class StreamingResponseWithStatusCode(StreamingResponse): if isinstance(first_chunk_content, str): first_chunk_content = first_chunk_content.encode(self.charset) - await send( - { - "type": "http.response.start", - "status": self.status_code, - "headers": self.raw_headers, - } - ) - self.response_started = True - await send( - { - "type": "http.response.body", - "body": first_chunk_content, - "more_body": more_body, - } - ) + try: + await send( + { + "type": "http.response.start", + "status": self.status_code, + "headers": self.raw_headers, + } + ) + self.response_started = True + await send( + { + "type": "http.response.body", + "body": first_chunk_content, + "more_body": more_body, + } + ) + except anyio.ClosedResourceError: + logger.info("Client disconnected during initial response, continuing processing without sending more chunks") + self._client_connected = False async for chunk in self.body_iterator: if isinstance(chunk, tuple): @@ -232,13 +241,21 @@ class StreamingResponseWithStatusCode(StreamingResponse): if isinstance(content, str): content = content.encode(self.charset) more_body = True - await send( - { - "type": "http.response.body", - "body": content, - "more_body": more_body, - } - ) + + # Only attempt to send if client is still connected + if self._client_connected: + try: + await send( + { + "type": "http.response.body", + "body": content, + "more_body": more_body, + } + ) + except anyio.ClosedResourceError: + logger.info("Client disconnected, continuing processing without sending more data") + self._client_connected = False + # Continue processing but don't try to send more data # Handle explicit job cancellations (should not throw error) except JobCancelledException as exc: