fix(core): handle Anthropic overloaded errors and Unicode encoding issues (#9305)
* fix: handle Anthropic overloaded_error in streaming interfaces * fix: handle Unicode surrogates in OpenAI requests Sanitize Unicode surrogate pairs before sending requests to OpenAI API. Surrogate pairs (U+D800-U+DFFF) are UTF-16 encoding artifacts that cause UnicodeEncodeError when encoding to UTF-8. Fixes Datadog error: 'utf-8' codec can't encode character '\ud83c' in position 326605: surrogates not allowed * fix: handle UnicodeEncodeError from lone Unicode surrogates in OpenAI requests Improved sanitize_unicode_surrogates() to explicitly filter out lone surrogate characters (U+D800 to U+DFFF) which are invalid in UTF-8. Previous implementation used errors='ignore' which could still fail in edge cases. New approach directly checks Unicode code points and removes any surrogates before data reaches httpx encoding. Also added sanitization to stream_async_responses() method which was missing it. Fixes: 'utf-8' codec can't encode character '\ud83c' in position X: surrogates not allowed
This commit is contained in:
@@ -37,7 +37,16 @@ class LettaLLMStreamAdapter(LettaLLMAdapter):
|
||||
org_id: str | None = None,
|
||||
user_id: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(llm_client, llm_config, call_type=call_type, agent_id=agent_id, agent_tags=agent_tags, run_id=run_id, org_id=org_id, user_id=user_id)
|
||||
super().__init__(
|
||||
llm_client,
|
||||
llm_config,
|
||||
call_type=call_type,
|
||||
agent_id=agent_id,
|
||||
agent_tags=agent_tags,
|
||||
run_id=run_id,
|
||||
org_id=org_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
self.interface: OpenAIStreamingInterface | AnthropicStreamingInterface | None = None
|
||||
|
||||
async def invoke_llm(
|
||||
|
||||
@@ -4,6 +4,53 @@ from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
|
||||
def sanitize_unicode_surrogates(value: Any) -> Any:
|
||||
"""Recursively remove invalid Unicode surrogate characters from strings.
|
||||
|
||||
Unicode surrogate pairs (U+D800 to U+DFFF) are used internally by UTF-16 encoding
|
||||
but are invalid as standalone characters in UTF-8. When present, they cause
|
||||
UnicodeEncodeError when encoding to UTF-8, breaking API requests that need to
|
||||
serialize data to JSON.
|
||||
|
||||
This function sanitizes:
|
||||
- Strings: removes unpaired surrogates that can't be encoded to UTF-8
|
||||
- Dicts: recursively sanitizes all string values
|
||||
- Lists: recursively sanitizes all elements
|
||||
- Other types: returned as-is
|
||||
|
||||
Args:
|
||||
value: The value to sanitize
|
||||
|
||||
Returns:
|
||||
The sanitized value with surrogate characters removed from all strings
|
||||
"""
|
||||
if isinstance(value, str):
|
||||
# Remove lone surrogate characters (U+D800 to U+DFFF) which are invalid in UTF-8
|
||||
# Using character filtering is more reliable than encode/decode for edge cases
|
||||
try:
|
||||
# Filter out any character in the surrogate range
|
||||
return "".join(char for char in value if not (0xD800 <= ord(char) <= 0xDFFF))
|
||||
except Exception:
|
||||
# Fallback: try encode with errors="replace" which replaces surrogates with <20>
|
||||
try:
|
||||
return value.encode("utf-8", errors="replace").decode("utf-8")
|
||||
except Exception:
|
||||
# Last resort: return original (should never reach here)
|
||||
return value
|
||||
elif isinstance(value, dict):
|
||||
# Recursively sanitize dictionary keys and values
|
||||
return {sanitize_unicode_surrogates(k): sanitize_unicode_surrogates(v) for k, v in value.items()}
|
||||
elif isinstance(value, list):
|
||||
# Recursively sanitize list elements
|
||||
return [sanitize_unicode_surrogates(item) for item in value]
|
||||
elif isinstance(value, tuple):
|
||||
# Recursively sanitize tuple elements (return as tuple)
|
||||
return tuple(sanitize_unicode_surrogates(item) for item in value)
|
||||
else:
|
||||
# Return other types as-is (int, float, bool, None, etc.)
|
||||
return value
|
||||
|
||||
|
||||
def sanitize_null_bytes(value: Any) -> Any:
|
||||
"""Recursively remove null bytes (0x00) from strings.
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import httpx
|
||||
|
||||
from letta.constants import DEFAULT_EMBEDDING_CHUNK_SIZE
|
||||
from letta.errors import LettaInvalidArgumentError
|
||||
from letta.otel.tracing import trace_method, log_event
|
||||
from letta.otel.tracing import log_event, trace_method
|
||||
from letta.schemas.embedding_config import EmbeddingConfig
|
||||
from letta.schemas.enums import MessageRole, TagMatchMode
|
||||
from letta.schemas.passage import Passage as PydanticPassage
|
||||
@@ -136,9 +136,7 @@ def async_retry_with_backoff(
|
||||
"function": func.__name__,
|
||||
},
|
||||
)
|
||||
logger.error(
|
||||
f"Turbopuffer operation '{func.__name__}' failed after {max_retries} retries: {e}"
|
||||
)
|
||||
logger.error(f"Turbopuffer operation '{func.__name__}' failed after {max_retries} retries: {e}")
|
||||
raise
|
||||
|
||||
# Wait with exponential backoff
|
||||
@@ -153,6 +151,7 @@ def async_retry_with_backoff(
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
# Global semaphore for Turbopuffer operations to prevent overwhelming the service
|
||||
# This is separate from embedding semaphore since Turbopuffer can handle more concurrency
|
||||
_GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5)
|
||||
|
||||
@@ -274,7 +274,13 @@ class SimpleAnthropicStreamingInterface:
|
||||
attributes={"stop_reason": StopReasonType.error.value, "error": str(e), "stacktrace": traceback.format_exc()},
|
||||
)
|
||||
yield LettaStopReason(stop_reason=StopReasonType.error)
|
||||
raise e
|
||||
|
||||
# Transform Anthropic errors into our custom error types for consistent handling
|
||||
from letta.llm_api.anthropic_client import AnthropicClient
|
||||
|
||||
client = AnthropicClient()
|
||||
transformed_error = client.handle_llm_error(e)
|
||||
raise transformed_error
|
||||
finally:
|
||||
logger.info("AnthropicStreamingInterface: Stream processing complete.")
|
||||
|
||||
|
||||
@@ -263,7 +263,13 @@ class AnthropicStreamingInterface:
|
||||
attributes={"stop_reason": StopReasonType.error.value, "error": str(e), "stacktrace": traceback.format_exc()},
|
||||
)
|
||||
yield LettaStopReason(stop_reason=StopReasonType.error)
|
||||
raise e
|
||||
|
||||
# Transform Anthropic errors into our custom error types for consistent handling
|
||||
from letta.llm_api.anthropic_client import AnthropicClient
|
||||
|
||||
client = AnthropicClient()
|
||||
transformed_error = client.handle_llm_error(e)
|
||||
raise transformed_error
|
||||
finally:
|
||||
logger.info("AnthropicStreamingInterface: Stream processing complete.")
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ from letta.errors import (
|
||||
LLMTimeoutError,
|
||||
LLMUnprocessableEntityError,
|
||||
)
|
||||
from letta.helpers.json_helpers import sanitize_unicode_surrogates
|
||||
from letta.llm_api.error_utils import is_context_window_overflow_message
|
||||
from letta.llm_api.helpers import (
|
||||
add_inner_thoughts_to_functions,
|
||||
@@ -587,6 +588,9 @@ class OpenAIClient(LLMClientBase):
|
||||
"""
|
||||
Performs underlying synchronous request to OpenAI API and returns raw response dict.
|
||||
"""
|
||||
# Sanitize Unicode surrogates to prevent encoding errors
|
||||
request_data = sanitize_unicode_surrogates(request_data)
|
||||
|
||||
client = OpenAI(**self._prepare_client_kwargs(llm_config))
|
||||
# Route based on payload shape: Responses uses 'input', Chat Completions uses 'messages'
|
||||
if "input" in request_data and "messages" not in request_data:
|
||||
@@ -601,6 +605,9 @@ class OpenAIClient(LLMClientBase):
|
||||
"""
|
||||
Performs underlying asynchronous request to OpenAI API and returns raw response dict.
|
||||
"""
|
||||
# Sanitize Unicode surrogates to prevent encoding errors
|
||||
request_data = sanitize_unicode_surrogates(request_data)
|
||||
|
||||
kwargs = await self._prepare_client_kwargs_async(llm_config)
|
||||
client = AsyncOpenAI(**kwargs)
|
||||
# Route based on payload shape: Responses uses 'input', Chat Completions uses 'messages'
|
||||
@@ -805,6 +812,9 @@ class OpenAIClient(LLMClientBase):
|
||||
"""
|
||||
Performs underlying asynchronous streaming request to OpenAI and returns the async stream iterator.
|
||||
"""
|
||||
# Sanitize Unicode surrogates to prevent encoding errors
|
||||
request_data = sanitize_unicode_surrogates(request_data)
|
||||
|
||||
kwargs = await self._prepare_client_kwargs_async(llm_config)
|
||||
client = AsyncOpenAI(**kwargs)
|
||||
|
||||
@@ -836,6 +846,9 @@ class OpenAIClient(LLMClientBase):
|
||||
"""
|
||||
Performs underlying asynchronous streaming request to OpenAI and returns the async stream iterator.
|
||||
"""
|
||||
# Sanitize Unicode surrogates to prevent encoding errors
|
||||
request_data = sanitize_unicode_surrogates(request_data)
|
||||
|
||||
kwargs = await self._prepare_client_kwargs_async(llm_config)
|
||||
client = AsyncOpenAI(**kwargs)
|
||||
response_stream: AsyncStream[ResponseStreamEvent] = await client.responses.create(**request_data, stream=True)
|
||||
|
||||
@@ -15,30 +15,12 @@ from letta.schemas.providers.base import Provider
|
||||
|
||||
|
||||
class SGLangProvider(Provider):
|
||||
provider_type: Literal[ProviderType.sglang] = Field(
|
||||
ProviderType.sglang,
|
||||
description="The type of the provider."
|
||||
)
|
||||
provider_category: ProviderCategory = Field(
|
||||
ProviderCategory.base,
|
||||
description="The category of the provider (base or byok)"
|
||||
)
|
||||
base_url: str = Field(
|
||||
...,
|
||||
description="Base URL for the SGLang API (e.g., http://localhost:30000)."
|
||||
)
|
||||
api_key: str | None = Field(
|
||||
None,
|
||||
description="API key for the SGLang API (optional for local instances)."
|
||||
)
|
||||
default_prompt_formatter: str | None = Field(
|
||||
default=None,
|
||||
description="Default prompt formatter (aka model wrapper)."
|
||||
)
|
||||
handle_base: str | None = Field(
|
||||
None,
|
||||
description="Custom handle base name for model handles."
|
||||
)
|
||||
provider_type: Literal[ProviderType.sglang] = Field(ProviderType.sglang, description="The type of the provider.")
|
||||
provider_category: ProviderCategory = Field(ProviderCategory.base, description="The category of the provider (base or byok)")
|
||||
base_url: str = Field(..., description="Base URL for the SGLang API (e.g., http://localhost:30000).")
|
||||
api_key: str | None = Field(None, description="API key for the SGLang API (optional for local instances).")
|
||||
default_prompt_formatter: str | None = Field(default=None, description="Default prompt formatter (aka model wrapper).")
|
||||
handle_base: str | None = Field(None, description="Custom handle base name for model handles.")
|
||||
|
||||
async def list_llm_models_async(self) -> list[LLMConfig]:
|
||||
from letta.llm_api.openai import openai_get_model_list_async
|
||||
|
||||
@@ -37,7 +37,9 @@ class AsyncSSEMCPClient(AsyncBaseMCPClient):
|
||||
# Pass timeout to prevent httpx.ReadTimeout errors on slow connections
|
||||
timeout = tool_settings.mcp_connect_to_server_timeout
|
||||
if self.oauth_provider:
|
||||
sse_cm = sse_client(url=server_config.server_url, headers=headers if headers else None, auth=self.oauth_provider, timeout=timeout)
|
||||
sse_cm = sse_client(
|
||||
url=server_config.server_url, headers=headers if headers else None, auth=self.oauth_provider, timeout=timeout
|
||||
)
|
||||
else:
|
||||
sse_cm = sse_client(url=server_config.server_url, headers=headers if headers else None, timeout=timeout)
|
||||
|
||||
|
||||
@@ -149,7 +149,6 @@ class StreamingService:
|
||||
client_tools=request.client_tools,
|
||||
include_compaction_messages=request.include_compaction_messages,
|
||||
)
|
||||
|
||||
|
||||
# handle background streaming if requested
|
||||
if request.background and settings.track_agent_run:
|
||||
|
||||
@@ -1157,7 +1157,7 @@ async def test_sliding_window_cutoff_index_does_not_exceed_message_count(server:
|
||||
summary, remaining_messages = await summarize_via_sliding_window(
|
||||
actor=actor,
|
||||
llm_config=llm_config,
|
||||
agent_llm_config=llm_config, # case where agent and summarizer have same config
|
||||
agent_llm_config=llm_config, # case where agent and summarizer have same config
|
||||
summarizer_config=summarizer_config,
|
||||
in_context_messages=messages,
|
||||
)
|
||||
|
||||
@@ -229,7 +229,6 @@ class TestSchemaValidator:
|
||||
assert status == SchemaHealth.STRICT_COMPLIANT
|
||||
assert reasons == []
|
||||
|
||||
|
||||
def test_root_level_without_required_non_strict(self):
|
||||
"""Test that root-level objects without 'required' field are STRICT_COMPLIANT (validator is relaxed)."""
|
||||
schema = {
|
||||
|
||||
@@ -210,7 +210,7 @@ class TestSummarizeSlidingWindowTelemetryContext:
|
||||
await summarizer_sliding_window.summarize_via_sliding_window(
|
||||
actor=mock_actor,
|
||||
llm_config=mock_llm_config,
|
||||
agent_llm_config=mock_llm_config, # case where agent and summarizer have same config
|
||||
agent_llm_config=mock_llm_config, # case where agent and summarizer have same config
|
||||
summarizer_config=mock_compaction_settings,
|
||||
in_context_messages=mock_messages,
|
||||
agent_id=agent_id,
|
||||
|
||||
Reference in New Issue
Block a user