diff --git a/letta/llm_api/anthropic_client.py b/letta/llm_api/anthropic_client.py index 102a5941..025d20c6 100644 --- a/letta/llm_api/anthropic_client.py +++ b/letta/llm_api/anthropic_client.py @@ -5,6 +5,7 @@ import re from typing import Dict, List, Optional, Union import anthropic +import httpx from anthropic import AsyncStream from anthropic.types.beta import BetaMessage as AnthropicMessage, BetaRawMessageStreamEvent from anthropic.types.beta.message_create_params import MessageCreateParamsNonStreaming @@ -749,6 +750,17 @@ class AnthropicClient(LLMClientBase): details={"cause": str(e.__cause__) if e.__cause__ else None}, ) + # Handle httpx.RemoteProtocolError which can occur during streaming + # when the remote server closes the connection unexpectedly + # (e.g., "peer closed connection without sending complete message body") + if isinstance(e, httpx.RemoteProtocolError): + logger.warning(f"[Anthropic] Remote protocol error during streaming: {e}") + return LLMConnectionError( + message=f"Connection error during Anthropic streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None}, + ) + if isinstance(e, anthropic.RateLimitError): logger.warning("[Anthropic] Rate limited (429). Consider backoff.") return LLMRateLimitError( diff --git a/letta/llm_api/google_vertex_client.py b/letta/llm_api/google_vertex_client.py index 3b5ccb6d..6b950371 100644 --- a/letta/llm_api/google_vertex_client.py +++ b/letta/llm_api/google_vertex_client.py @@ -3,6 +3,7 @@ import json import uuid from typing import AsyncIterator, List, Optional +import httpx from google.genai import Client, errors from google.genai.types import ( FunctionCallingConfig, @@ -860,6 +861,17 @@ class GoogleVertexClient(LLMClientBase): }, ) + # Handle httpx.RemoteProtocolError which can occur during streaming + # when the remote server closes the connection unexpectedly + # (e.g., "peer closed connection without sending complete message body") + if isinstance(e, httpx.RemoteProtocolError): + logger.warning(f"{self._provider_prefix()} Remote protocol error during streaming: {e}") + return LLMConnectionError( + message=f"Connection error during {self._provider_name()} streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None}, + ) + # Handle connection-related errors if "connection" in str(e).lower() or "timeout" in str(e).lower(): logger.warning(f"{self._provider_prefix()} Connection/timeout error: {e}") diff --git a/letta/llm_api/llm_client_base.py b/letta/llm_api/llm_client_base.py index b25df76a..66c04c44 100644 --- a/letta/llm_api/llm_client_base.py +++ b/letta/llm_api/llm_client_base.py @@ -2,11 +2,12 @@ import json from abc import abstractmethod from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union +import httpx from anthropic.types.beta.messages import BetaMessageBatch from openai import AsyncStream, Stream from openai.types.chat.chat_completion_chunk import ChatCompletionChunk -from letta.errors import LLMError +from letta.errors import ErrorCode, LLMConnectionError, LLMError from letta.otel.tracing import log_event, trace_method from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import AgentType, ProviderCategory @@ -215,6 +216,20 @@ class LLMClientBase: Returns: An LLMError subclass that represents the error in a provider-agnostic way """ + # Handle httpx.RemoteProtocolError which can occur during streaming + # when the remote server closes the connection unexpectedly + # (e.g., "peer closed connection without sending complete message body") + if isinstance(e, httpx.RemoteProtocolError): + from letta.log import get_logger + + logger = get_logger(__name__) + logger.warning(f"[LLM] Remote protocol error during streaming: {e}") + return LLMConnectionError( + message=f"Connection error during streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None}, + ) + return LLMError(f"Unhandled LLM error: {str(e)}") def get_byok_overrides(self, llm_config: LLMConfig) -> Tuple[Optional[str], Optional[str], Optional[str]]: diff --git a/letta/llm_api/openai_client.py b/letta/llm_api/openai_client.py index 26dd493a..e2760c9e 100644 --- a/letta/llm_api/openai_client.py +++ b/letta/llm_api/openai_client.py @@ -4,6 +4,7 @@ import os import time from typing import Any, List, Optional +import httpx import openai from openai import AsyncOpenAI, AsyncStream, OpenAI from openai.types import Reasoning @@ -960,6 +961,17 @@ class OpenAIClient(LLMClientBase): details={"cause": str(e.__cause__) if e.__cause__ else None}, ) + # Handle httpx.RemoteProtocolError which can occur during streaming + # when the remote server closes the connection unexpectedly + # (e.g., "peer closed connection without sending complete message body") + if isinstance(e, httpx.RemoteProtocolError): + logger.warning(f"[OpenAI] Remote protocol error during streaming: {e}") + return LLMConnectionError( + message=f"Connection error during OpenAI streaming: {str(e)}", + code=ErrorCode.INTERNAL_SERVER_ERROR, + details={"cause": str(e.__cause__) if e.__cause__ else None}, + ) + if isinstance(e, openai.RateLimitError): logger.warning(f"[OpenAI] Rate limited (429). Consider backoff. Error: {e}") return LLMRateLimitError(