fix: handle httpx.RemoteProtocolError during LLM streaming (#8206)
This commit is contained in:
committed by
Caren Thomas
parent
abb325f32d
commit
76008c61f4
@@ -5,6 +5,7 @@ import re
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
import anthropic
|
||||
import httpx
|
||||
from anthropic import AsyncStream
|
||||
from anthropic.types.beta import BetaMessage as AnthropicMessage, BetaRawMessageStreamEvent
|
||||
from anthropic.types.beta.message_create_params import MessageCreateParamsNonStreaming
|
||||
@@ -749,6 +750,17 @@ class AnthropicClient(LLMClientBase):
|
||||
details={"cause": str(e.__cause__) if e.__cause__ else None},
|
||||
)
|
||||
|
||||
# Handle httpx.RemoteProtocolError which can occur during streaming
|
||||
# when the remote server closes the connection unexpectedly
|
||||
# (e.g., "peer closed connection without sending complete message body")
|
||||
if isinstance(e, httpx.RemoteProtocolError):
|
||||
logger.warning(f"[Anthropic] Remote protocol error during streaming: {e}")
|
||||
return LLMConnectionError(
|
||||
message=f"Connection error during Anthropic streaming: {str(e)}",
|
||||
code=ErrorCode.INTERNAL_SERVER_ERROR,
|
||||
details={"cause": str(e.__cause__) if e.__cause__ else None},
|
||||
)
|
||||
|
||||
if isinstance(e, anthropic.RateLimitError):
|
||||
logger.warning("[Anthropic] Rate limited (429). Consider backoff.")
|
||||
return LLMRateLimitError(
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import uuid
|
||||
from typing import AsyncIterator, List, Optional
|
||||
|
||||
import httpx
|
||||
from google.genai import Client, errors
|
||||
from google.genai.types import (
|
||||
FunctionCallingConfig,
|
||||
@@ -860,6 +861,17 @@ class GoogleVertexClient(LLMClientBase):
|
||||
},
|
||||
)
|
||||
|
||||
# Handle httpx.RemoteProtocolError which can occur during streaming
|
||||
# when the remote server closes the connection unexpectedly
|
||||
# (e.g., "peer closed connection without sending complete message body")
|
||||
if isinstance(e, httpx.RemoteProtocolError):
|
||||
logger.warning(f"{self._provider_prefix()} Remote protocol error during streaming: {e}")
|
||||
return LLMConnectionError(
|
||||
message=f"Connection error during {self._provider_name()} streaming: {str(e)}",
|
||||
code=ErrorCode.INTERNAL_SERVER_ERROR,
|
||||
details={"cause": str(e.__cause__) if e.__cause__ else None},
|
||||
)
|
||||
|
||||
# Handle connection-related errors
|
||||
if "connection" in str(e).lower() or "timeout" in str(e).lower():
|
||||
logger.warning(f"{self._provider_prefix()} Connection/timeout error: {e}")
|
||||
|
||||
@@ -2,11 +2,12 @@ import json
|
||||
from abc import abstractmethod
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import httpx
|
||||
from anthropic.types.beta.messages import BetaMessageBatch
|
||||
from openai import AsyncStream, Stream
|
||||
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk
|
||||
|
||||
from letta.errors import LLMError
|
||||
from letta.errors import ErrorCode, LLMConnectionError, LLMError
|
||||
from letta.otel.tracing import log_event, trace_method
|
||||
from letta.schemas.embedding_config import EmbeddingConfig
|
||||
from letta.schemas.enums import AgentType, ProviderCategory
|
||||
@@ -215,6 +216,20 @@ class LLMClientBase:
|
||||
Returns:
|
||||
An LLMError subclass that represents the error in a provider-agnostic way
|
||||
"""
|
||||
# Handle httpx.RemoteProtocolError which can occur during streaming
|
||||
# when the remote server closes the connection unexpectedly
|
||||
# (e.g., "peer closed connection without sending complete message body")
|
||||
if isinstance(e, httpx.RemoteProtocolError):
|
||||
from letta.log import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
logger.warning(f"[LLM] Remote protocol error during streaming: {e}")
|
||||
return LLMConnectionError(
|
||||
message=f"Connection error during streaming: {str(e)}",
|
||||
code=ErrorCode.INTERNAL_SERVER_ERROR,
|
||||
details={"cause": str(e.__cause__) if e.__cause__ else None},
|
||||
)
|
||||
|
||||
return LLMError(f"Unhandled LLM error: {str(e)}")
|
||||
|
||||
def get_byok_overrides(self, llm_config: LLMConfig) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
|
||||
@@ -4,6 +4,7 @@ import os
|
||||
import time
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import httpx
|
||||
import openai
|
||||
from openai import AsyncOpenAI, AsyncStream, OpenAI
|
||||
from openai.types import Reasoning
|
||||
@@ -960,6 +961,17 @@ class OpenAIClient(LLMClientBase):
|
||||
details={"cause": str(e.__cause__) if e.__cause__ else None},
|
||||
)
|
||||
|
||||
# Handle httpx.RemoteProtocolError which can occur during streaming
|
||||
# when the remote server closes the connection unexpectedly
|
||||
# (e.g., "peer closed connection without sending complete message body")
|
||||
if isinstance(e, httpx.RemoteProtocolError):
|
||||
logger.warning(f"[OpenAI] Remote protocol error during streaming: {e}")
|
||||
return LLMConnectionError(
|
||||
message=f"Connection error during OpenAI streaming: {str(e)}",
|
||||
code=ErrorCode.INTERNAL_SERVER_ERROR,
|
||||
details={"cause": str(e.__cause__) if e.__cause__ else None},
|
||||
)
|
||||
|
||||
if isinstance(e, openai.RateLimitError):
|
||||
logger.warning(f"[OpenAI] Rate limited (429). Consider backoff. Error: {e}")
|
||||
return LLMRateLimitError(
|
||||
|
||||
Reference in New Issue
Block a user