From f2171447a8c4ed7f1e1ad1702ac861e6e598f896 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 8 Jan 2026 11:52:46 -0800 Subject: [PATCH] fix: handle httpx.ReadError, WriteError, and ConnectError in LLM streaming clients (#8243) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds explicit handling for httpx network errors (ReadError, WriteError, ConnectError) in AnthropicClient, OpenAIClient, and GoogleVertexClient. These errors can occur during streaming when the connection is unexpectedly closed while reading/writing data. Maps these errors to LLMConnectionError for consistent error handling. Fixes #8221 (and duplicate #8156) 🤖 Generated with [Letta Code](https://letta.com) Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com> Co-authored-by: Letta Co-authored-by: Kian Jones <11655409+kianjones9@users.noreply.github.com> --- letta/llm_api/anthropic_client.py | 10 +++ letta/llm_api/google_vertex_client.py | 10 +++ letta/llm_api/openai_client.py | 10 +++ ...letta_llm_stream_adapter_error_handling.py | 70 ++++++++++++++++++- 4 files changed, 99 insertions(+), 1 deletion(-) diff --git a/letta/llm_api/anthropic_client.py b/letta/llm_api/anthropic_client.py index cc7b4f0b..0e78956b 100644 --- a/letta/llm_api/anthropic_client.py +++ b/letta/llm_api/anthropic_client.py @@ -763,6 +763,16 @@ class AnthropicClient(LLMClientBase): details={"cause": str(e.__cause__) if e.__cause__ else None}, ) + # Handle httpx network errors which can occur during streaming + # when the connection is unexpectedly closed while reading/writing + if isinstance(e, (httpx.ReadError, httpx.WriteError, httpx.ConnectError)): + logger.warning(f"[Anthropic] Network error during streaming: {type(e).__name__}: {e}") + return LLMConnectionError( + message=f"Network error during Anthropic streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None, "error_type": type(e).__name__}, + ) + if isinstance(e, anthropic.RateLimitError): logger.warning("[Anthropic] Rate limited (429). Consider backoff.") return LLMRateLimitError( diff --git a/letta/llm_api/google_vertex_client.py b/letta/llm_api/google_vertex_client.py index 6b950371..0bd13ed0 100644 --- a/letta/llm_api/google_vertex_client.py +++ b/letta/llm_api/google_vertex_client.py @@ -872,6 +872,16 @@ class GoogleVertexClient(LLMClientBase): details={"cause": str(e.__cause__) if e.__cause__ else None}, ) + # Handle httpx network errors which can occur during streaming + # when the connection is unexpectedly closed while reading/writing + if isinstance(e, (httpx.ReadError, httpx.WriteError, httpx.ConnectError)): + logger.warning(f"{self._provider_prefix()} Network error during streaming: {type(e).__name__}: {e}") + return LLMConnectionError( + message=f"Network error during {self._provider_name()} streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None, "error_type": type(e).__name__}, + ) + # Handle connection-related errors if "connection" in str(e).lower() or "timeout" in str(e).lower(): logger.warning(f"{self._provider_prefix()} Connection/timeout error: {e}") diff --git a/letta/llm_api/openai_client.py b/letta/llm_api/openai_client.py index e2760c9e..03de4ca4 100644 --- a/letta/llm_api/openai_client.py +++ b/letta/llm_api/openai_client.py @@ -972,6 +972,16 @@ class OpenAIClient(LLMClientBase): details={"cause": str(e.__cause__) if e.__cause__ else None}, ) + # Handle httpx network errors which can occur during streaming + # when the connection is unexpectedly closed while reading/writing + if isinstance(e, (httpx.ReadError, httpx.WriteError, httpx.ConnectError)): + logger.warning(f"[OpenAI] Network error during streaming: {type(e).__name__}: {e}") + return LLMConnectionError( + message=f"Network error during OpenAI streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None, "error_type": type(e).__name__}, + ) + if isinstance(e, openai.RateLimitError): logger.warning(f"[OpenAI] Rate limited (429). Consider backoff. Error: {e}") return LLMRateLimitError( diff --git a/tests/adapters/test_letta_llm_stream_adapter_error_handling.py b/tests/adapters/test_letta_llm_stream_adapter_error_handling.py index a0276757..3241ce7e 100644 --- a/tests/adapters/test_letta_llm_stream_adapter_error_handling.py +++ b/tests/adapters/test_letta_llm_stream_adapter_error_handling.py @@ -3,7 +3,7 @@ import httpx import pytest from letta.adapters.letta_llm_stream_adapter import LettaLLMStreamAdapter -from letta.errors import ContextWindowExceededError, LLMServerError +from letta.errors import ContextWindowExceededError, LLMConnectionError, LLMServerError from letta.llm_api.anthropic_client import AnthropicClient from letta.schemas.llm_config import LLMConfig @@ -91,6 +91,74 @@ async def test_letta_llm_stream_adapter_converts_anthropic_413_request_too_large pass +@pytest.mark.asyncio +async def test_letta_llm_stream_adapter_converts_httpx_read_error(monkeypatch): + """Regression: httpx.ReadError raised during streaming should be converted to LLMConnectionError.""" + + class FakeAsyncStream: + """Mimics anthropic.AsyncStream enough for AnthropicStreamingInterface (async cm + async iterator).""" + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + def __aiter__(self): + return self + + async def __anext__(self): + raise httpx.ReadError("Connection closed unexpectedly") + + async def fake_stream_async(self, request_data: dict, llm_config: LLMConfig): + return FakeAsyncStream() + + monkeypatch.setattr(AnthropicClient, "stream_async", fake_stream_async, raising=True) + + llm_client = AnthropicClient() + llm_config = LLMConfig(model="claude-sonnet-4-5-20250929", model_endpoint_type="anthropic", context_window=200000) + adapter = LettaLLMStreamAdapter(llm_client=llm_client, llm_config=llm_config) + + gen = adapter.invoke_llm(request_data={}, messages=[], tools=[], use_assistant_message=True) + with pytest.raises(LLMConnectionError): + async for _ in gen: + pass + + +@pytest.mark.asyncio +async def test_letta_llm_stream_adapter_converts_httpx_write_error(monkeypatch): + """Regression: httpx.WriteError raised during streaming should be converted to LLMConnectionError.""" + + class FakeAsyncStream: + """Mimics anthropic.AsyncStream enough for AnthropicStreamingInterface (async cm + async iterator).""" + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + def __aiter__(self): + return self + + async def __anext__(self): + raise httpx.WriteError("Failed to write to connection") + + async def fake_stream_async(self, request_data: dict, llm_config: LLMConfig): + return FakeAsyncStream() + + monkeypatch.setattr(AnthropicClient, "stream_async", fake_stream_async, raising=True) + + llm_client = AnthropicClient() + llm_config = LLMConfig(model="claude-sonnet-4-5-20250929", model_endpoint_type="anthropic", context_window=200000) + adapter = LettaLLMStreamAdapter(llm_client=llm_client, llm_config=llm_config) + + gen = adapter.invoke_llm(request_data={}, messages=[], tools=[], use_assistant_message=True) + with pytest.raises(LLMConnectionError): + async for _ in gen: + pass + + def test_anthropic_client_handle_llm_error_413_status_code(): """Test that handle_llm_error correctly converts 413 status code to ContextWindowExceededError.""" client = AnthropicClient()