diff --git a/letta/llm_api/anthropic_client.py b/letta/llm_api/anthropic_client.py index cc7b4f0b..0e78956b 100644 --- a/letta/llm_api/anthropic_client.py +++ b/letta/llm_api/anthropic_client.py @@ -763,6 +763,16 @@ class AnthropicClient(LLMClientBase): details={"cause": str(e.__cause__) if e.__cause__ else None}, ) + # Handle httpx network errors which can occur during streaming + # when the connection is unexpectedly closed while reading/writing + if isinstance(e, (httpx.ReadError, httpx.WriteError, httpx.ConnectError)): + logger.warning(f"[Anthropic] Network error during streaming: {type(e).__name__}: {e}") + return LLMConnectionError( + message=f"Network error during Anthropic streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None, "error_type": type(e).__name__}, + ) + if isinstance(e, anthropic.RateLimitError): logger.warning("[Anthropic] Rate limited (429). Consider backoff.") return LLMRateLimitError( diff --git a/letta/llm_api/google_vertex_client.py b/letta/llm_api/google_vertex_client.py index 6b950371..0bd13ed0 100644 --- a/letta/llm_api/google_vertex_client.py +++ b/letta/llm_api/google_vertex_client.py @@ -872,6 +872,16 @@ class GoogleVertexClient(LLMClientBase): details={"cause": str(e.__cause__) if e.__cause__ else None}, ) + # Handle httpx network errors which can occur during streaming + # when the connection is unexpectedly closed while reading/writing + if isinstance(e, (httpx.ReadError, httpx.WriteError, httpx.ConnectError)): + logger.warning(f"{self._provider_prefix()} Network error during streaming: {type(e).__name__}: {e}") + return LLMConnectionError( + message=f"Network error during {self._provider_name()} streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None, "error_type": type(e).__name__}, + ) + # Handle connection-related errors if "connection" in str(e).lower() or "timeout" in str(e).lower(): logger.warning(f"{self._provider_prefix()} Connection/timeout error: {e}") diff --git a/letta/llm_api/openai_client.py b/letta/llm_api/openai_client.py index e2760c9e..03de4ca4 100644 --- a/letta/llm_api/openai_client.py +++ b/letta/llm_api/openai_client.py @@ -972,6 +972,16 @@ class OpenAIClient(LLMClientBase): details={"cause": str(e.__cause__) if e.__cause__ else None}, ) + # Handle httpx network errors which can occur during streaming + # when the connection is unexpectedly closed while reading/writing + if isinstance(e, (httpx.ReadError, httpx.WriteError, httpx.ConnectError)): + logger.warning(f"[OpenAI] Network error during streaming: {type(e).__name__}: {e}") + return LLMConnectionError( + message=f"Network error during OpenAI streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None, "error_type": type(e).__name__}, + ) + if isinstance(e, openai.RateLimitError): logger.warning(f"[OpenAI] Rate limited (429). Consider backoff. Error: {e}") return LLMRateLimitError( diff --git a/tests/adapters/test_letta_llm_stream_adapter_error_handling.py b/tests/adapters/test_letta_llm_stream_adapter_error_handling.py index a0276757..3241ce7e 100644 --- a/tests/adapters/test_letta_llm_stream_adapter_error_handling.py +++ b/tests/adapters/test_letta_llm_stream_adapter_error_handling.py @@ -3,7 +3,7 @@ import httpx import pytest from letta.adapters.letta_llm_stream_adapter import LettaLLMStreamAdapter -from letta.errors import ContextWindowExceededError, LLMServerError +from letta.errors import ContextWindowExceededError, LLMConnectionError, LLMServerError from letta.llm_api.anthropic_client import AnthropicClient from letta.schemas.llm_config import LLMConfig @@ -91,6 +91,74 @@ async def test_letta_llm_stream_adapter_converts_anthropic_413_request_too_large pass +@pytest.mark.asyncio +async def test_letta_llm_stream_adapter_converts_httpx_read_error(monkeypatch): + """Regression: httpx.ReadError raised during streaming should be converted to LLMConnectionError.""" + + class FakeAsyncStream: + """Mimics anthropic.AsyncStream enough for AnthropicStreamingInterface (async cm + async iterator).""" + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + def __aiter__(self): + return self + + async def __anext__(self): + raise httpx.ReadError("Connection closed unexpectedly") + + async def fake_stream_async(self, request_data: dict, llm_config: LLMConfig): + return FakeAsyncStream() + + monkeypatch.setattr(AnthropicClient, "stream_async", fake_stream_async, raising=True) + + llm_client = AnthropicClient() + llm_config = LLMConfig(model="claude-sonnet-4-5-20250929", model_endpoint_type="anthropic", context_window=200000) + adapter = LettaLLMStreamAdapter(llm_client=llm_client, llm_config=llm_config) + + gen = adapter.invoke_llm(request_data={}, messages=[], tools=[], use_assistant_message=True) + with pytest.raises(LLMConnectionError): + async for _ in gen: + pass + + +@pytest.mark.asyncio +async def test_letta_llm_stream_adapter_converts_httpx_write_error(monkeypatch): + """Regression: httpx.WriteError raised during streaming should be converted to LLMConnectionError.""" + + class FakeAsyncStream: + """Mimics anthropic.AsyncStream enough for AnthropicStreamingInterface (async cm + async iterator).""" + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + def __aiter__(self): + return self + + async def __anext__(self): + raise httpx.WriteError("Failed to write to connection") + + async def fake_stream_async(self, request_data: dict, llm_config: LLMConfig): + return FakeAsyncStream() + + monkeypatch.setattr(AnthropicClient, "stream_async", fake_stream_async, raising=True) + + llm_client = AnthropicClient() + llm_config = LLMConfig(model="claude-sonnet-4-5-20250929", model_endpoint_type="anthropic", context_window=200000) + adapter = LettaLLMStreamAdapter(llm_client=llm_client, llm_config=llm_config) + + gen = adapter.invoke_llm(request_data={}, messages=[], tools=[], use_assistant_message=True) + with pytest.raises(LLMConnectionError): + async for _ in gen: + pass + + def test_anthropic_client_handle_llm_error_413_status_code(): """Test that handle_llm_error correctly converts 413 status code to ContextWindowExceededError.""" client = AnthropicClient()