From eb472dc1e043102ab8dee99d9a6896d17b4eb837 Mon Sep 17 00:00:00 2001 From: cthomas Date: Mon, 18 Aug 2025 17:11:19 -0700 Subject: [PATCH] feat: introduce asyncio shield to stream response (#3992) --- letta/server/rest_api/streaming_response.py | 13 +++++++++++++ letta/settings.py | 1 + 2 files changed, 14 insertions(+) diff --git a/letta/server/rest_api/streaming_response.py b/letta/server/rest_api/streaming_response.py index 59aa4bc4..87c8d60a 100644 --- a/letta/server/rest_api/streaming_response.py +++ b/letta/server/rest_api/streaming_response.py @@ -15,6 +15,7 @@ from letta.schemas.letta_ping import LettaPing from letta.schemas.user import User from letta.server.rest_api.utils import capture_sentry_exception from letta.services.job_manager import JobManager +from letta.settings import settings logger = get_logger(__name__) @@ -177,6 +178,18 @@ class StreamingResponseWithStatusCode(StreamingResponse): response_started: bool = False async def stream_response(self, send: Send) -> None: + if settings.use_asyncio_shield: + try: + await asyncio.shield(self._protected_stream_response(send)) + except asyncio.CancelledError: + logger.info(f"Stream response was cancelled, but shielded task should continue") + except Exception as e: + logger.error(f"Error in protected stream response: {e}") + raise + else: + await self._protected_stream_response(send) + + async def _protected_stream_response(self, send: Send) -> None: more_body = True try: first_chunk = await self.body_iterator.__anext__() diff --git a/letta/settings.py b/letta/settings.py index 0aeae09c..f497e6fd 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -268,6 +268,7 @@ class Settings(BaseSettings): # experimental toggle use_experimental: bool = False use_vertex_structured_outputs_experimental: bool = False + use_asyncio_shield: bool = True # Database pool monitoring enable_db_pool_monitoring: bool = True # Enable connection pool monitoring