From 6f746c5225c651c094947280c751bffc46421e8a Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Thu, 5 Feb 2026 16:00:36 -0800 Subject: [PATCH] fix(core): handle Anthropic overloaded errors and Unicode encoding issues (#9305) * fix: handle Anthropic overloaded_error in streaming interfaces * fix: handle Unicode surrogates in OpenAI requests Sanitize Unicode surrogate pairs before sending requests to OpenAI API. Surrogate pairs (U+D800-U+DFFF) are UTF-16 encoding artifacts that cause UnicodeEncodeError when encoding to UTF-8. Fixes Datadog error: 'utf-8' codec can't encode character '\ud83c' in position 326605: surrogates not allowed * fix: handle UnicodeEncodeError from lone Unicode surrogates in OpenAI requests Improved sanitize_unicode_surrogates() to explicitly filter out lone surrogate characters (U+D800 to U+DFFF) which are invalid in UTF-8. Previous implementation used errors='ignore' which could still fail in edge cases. New approach directly checks Unicode code points and removes any surrogates before data reaches httpx encoding. Also added sanitization to stream_async_responses() method which was missing it. Fixes: 'utf-8' codec can't encode character '\ud83c' in position X: surrogates not allowed --- letta/adapters/letta_llm_stream_adapter.py | 11 ++++- letta/helpers/json_helpers.py | 47 +++++++++++++++++++ letta/helpers/tpuf_client.py | 7 ++- ..._parallel_tool_call_streaming_interface.py | 8 +++- .../anthropic_streaming_interface.py | 8 +++- letta/llm_api/openai_client.py | 13 +++++ letta/schemas/providers/sglang.py | 30 +++--------- letta/services/mcp/sse_client.py | 4 +- letta/services/streaming_service.py | 1 - tests/integration_test_summarizer.py | 2 +- tests/mcp_tests/test_schema_validator.py | 1 - tests/test_provider_trace_summarization.py | 2 +- 12 files changed, 98 insertions(+), 36 deletions(-) diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index 4ef7373e..2bb7ed9c 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -37,7 +37,16 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): org_id: str | None = None, user_id: str | None = None, ) -> None: - super().__init__(llm_client, llm_config, call_type=call_type, agent_id=agent_id, agent_tags=agent_tags, run_id=run_id, org_id=org_id, user_id=user_id) + super().__init__( + llm_client, + llm_config, + call_type=call_type, + agent_id=agent_id, + agent_tags=agent_tags, + run_id=run_id, + org_id=org_id, + user_id=user_id, + ) self.interface: OpenAIStreamingInterface | AnthropicStreamingInterface | None = None async def invoke_llm( diff --git a/letta/helpers/json_helpers.py b/letta/helpers/json_helpers.py index 27f8b735..35f48d0e 100644 --- a/letta/helpers/json_helpers.py +++ b/letta/helpers/json_helpers.py @@ -4,6 +4,53 @@ from datetime import datetime from typing import Any +def sanitize_unicode_surrogates(value: Any) -> Any: + """Recursively remove invalid Unicode surrogate characters from strings. + + Unicode surrogate pairs (U+D800 to U+DFFF) are used internally by UTF-16 encoding + but are invalid as standalone characters in UTF-8. When present, they cause + UnicodeEncodeError when encoding to UTF-8, breaking API requests that need to + serialize data to JSON. + + This function sanitizes: + - Strings: removes unpaired surrogates that can't be encoded to UTF-8 + - Dicts: recursively sanitizes all string values + - Lists: recursively sanitizes all elements + - Other types: returned as-is + + Args: + value: The value to sanitize + + Returns: + The sanitized value with surrogate characters removed from all strings + """ + if isinstance(value, str): + # Remove lone surrogate characters (U+D800 to U+DFFF) which are invalid in UTF-8 + # Using character filtering is more reliable than encode/decode for edge cases + try: + # Filter out any character in the surrogate range + return "".join(char for char in value if not (0xD800 <= ord(char) <= 0xDFFF)) + except Exception: + # Fallback: try encode with errors="replace" which replaces surrogates with � + try: + return value.encode("utf-8", errors="replace").decode("utf-8") + except Exception: + # Last resort: return original (should never reach here) + return value + elif isinstance(value, dict): + # Recursively sanitize dictionary keys and values + return {sanitize_unicode_surrogates(k): sanitize_unicode_surrogates(v) for k, v in value.items()} + elif isinstance(value, list): + # Recursively sanitize list elements + return [sanitize_unicode_surrogates(item) for item in value] + elif isinstance(value, tuple): + # Recursively sanitize tuple elements (return as tuple) + return tuple(sanitize_unicode_surrogates(item) for item in value) + else: + # Return other types as-is (int, float, bool, None, etc.) + return value + + def sanitize_null_bytes(value: Any) -> Any: """Recursively remove null bytes (0x00) from strings. diff --git a/letta/helpers/tpuf_client.py b/letta/helpers/tpuf_client.py index 17ac59fa..6c2fb6cc 100644 --- a/letta/helpers/tpuf_client.py +++ b/letta/helpers/tpuf_client.py @@ -12,7 +12,7 @@ import httpx from letta.constants import DEFAULT_EMBEDDING_CHUNK_SIZE from letta.errors import LettaInvalidArgumentError -from letta.otel.tracing import trace_method, log_event +from letta.otel.tracing import log_event, trace_method from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import MessageRole, TagMatchMode from letta.schemas.passage import Passage as PydanticPassage @@ -136,9 +136,7 @@ def async_retry_with_backoff( "function": func.__name__, }, ) - logger.error( - f"Turbopuffer operation '{func.__name__}' failed after {max_retries} retries: {e}" - ) + logger.error(f"Turbopuffer operation '{func.__name__}' failed after {max_retries} retries: {e}") raise # Wait with exponential backoff @@ -153,6 +151,7 @@ def async_retry_with_backoff( return decorator + # Global semaphore for Turbopuffer operations to prevent overwhelming the service # This is separate from embedding semaphore since Turbopuffer can handle more concurrency _GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5) diff --git a/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py b/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py index 0c13a727..648007f2 100644 --- a/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py +++ b/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py @@ -274,7 +274,13 @@ class SimpleAnthropicStreamingInterface: attributes={"stop_reason": StopReasonType.error.value, "error": str(e), "stacktrace": traceback.format_exc()}, ) yield LettaStopReason(stop_reason=StopReasonType.error) - raise e + + # Transform Anthropic errors into our custom error types for consistent handling + from letta.llm_api.anthropic_client import AnthropicClient + + client = AnthropicClient() + transformed_error = client.handle_llm_error(e) + raise transformed_error finally: logger.info("AnthropicStreamingInterface: Stream processing complete.") diff --git a/letta/interfaces/anthropic_streaming_interface.py b/letta/interfaces/anthropic_streaming_interface.py index fa1fdefa..b5ffe9b5 100644 --- a/letta/interfaces/anthropic_streaming_interface.py +++ b/letta/interfaces/anthropic_streaming_interface.py @@ -263,7 +263,13 @@ class AnthropicStreamingInterface: attributes={"stop_reason": StopReasonType.error.value, "error": str(e), "stacktrace": traceback.format_exc()}, ) yield LettaStopReason(stop_reason=StopReasonType.error) - raise e + + # Transform Anthropic errors into our custom error types for consistent handling + from letta.llm_api.anthropic_client import AnthropicClient + + client = AnthropicClient() + transformed_error = client.handle_llm_error(e) + raise transformed_error finally: logger.info("AnthropicStreamingInterface: Stream processing complete.") diff --git a/letta/llm_api/openai_client.py b/letta/llm_api/openai_client.py index d7cce539..44aec1ff 100644 --- a/letta/llm_api/openai_client.py +++ b/letta/llm_api/openai_client.py @@ -27,6 +27,7 @@ from letta.errors import ( LLMTimeoutError, LLMUnprocessableEntityError, ) +from letta.helpers.json_helpers import sanitize_unicode_surrogates from letta.llm_api.error_utils import is_context_window_overflow_message from letta.llm_api.helpers import ( add_inner_thoughts_to_functions, @@ -587,6 +588,9 @@ class OpenAIClient(LLMClientBase): """ Performs underlying synchronous request to OpenAI API and returns raw response dict. """ + # Sanitize Unicode surrogates to prevent encoding errors + request_data = sanitize_unicode_surrogates(request_data) + client = OpenAI(**self._prepare_client_kwargs(llm_config)) # Route based on payload shape: Responses uses 'input', Chat Completions uses 'messages' if "input" in request_data and "messages" not in request_data: @@ -601,6 +605,9 @@ class OpenAIClient(LLMClientBase): """ Performs underlying asynchronous request to OpenAI API and returns raw response dict. """ + # Sanitize Unicode surrogates to prevent encoding errors + request_data = sanitize_unicode_surrogates(request_data) + kwargs = await self._prepare_client_kwargs_async(llm_config) client = AsyncOpenAI(**kwargs) # Route based on payload shape: Responses uses 'input', Chat Completions uses 'messages' @@ -805,6 +812,9 @@ class OpenAIClient(LLMClientBase): """ Performs underlying asynchronous streaming request to OpenAI and returns the async stream iterator. """ + # Sanitize Unicode surrogates to prevent encoding errors + request_data = sanitize_unicode_surrogates(request_data) + kwargs = await self._prepare_client_kwargs_async(llm_config) client = AsyncOpenAI(**kwargs) @@ -836,6 +846,9 @@ class OpenAIClient(LLMClientBase): """ Performs underlying asynchronous streaming request to OpenAI and returns the async stream iterator. """ + # Sanitize Unicode surrogates to prevent encoding errors + request_data = sanitize_unicode_surrogates(request_data) + kwargs = await self._prepare_client_kwargs_async(llm_config) client = AsyncOpenAI(**kwargs) response_stream: AsyncStream[ResponseStreamEvent] = await client.responses.create(**request_data, stream=True) diff --git a/letta/schemas/providers/sglang.py b/letta/schemas/providers/sglang.py index 657c2e38..5cae03c6 100644 --- a/letta/schemas/providers/sglang.py +++ b/letta/schemas/providers/sglang.py @@ -15,30 +15,12 @@ from letta.schemas.providers.base import Provider class SGLangProvider(Provider): - provider_type: Literal[ProviderType.sglang] = Field( - ProviderType.sglang, - description="The type of the provider." - ) - provider_category: ProviderCategory = Field( - ProviderCategory.base, - description="The category of the provider (base or byok)" - ) - base_url: str = Field( - ..., - description="Base URL for the SGLang API (e.g., http://localhost:30000)." - ) - api_key: str | None = Field( - None, - description="API key for the SGLang API (optional for local instances)." - ) - default_prompt_formatter: str | None = Field( - default=None, - description="Default prompt formatter (aka model wrapper)." - ) - handle_base: str | None = Field( - None, - description="Custom handle base name for model handles." - ) + provider_type: Literal[ProviderType.sglang] = Field(ProviderType.sglang, description="The type of the provider.") + provider_category: ProviderCategory = Field(ProviderCategory.base, description="The category of the provider (base or byok)") + base_url: str = Field(..., description="Base URL for the SGLang API (e.g., http://localhost:30000).") + api_key: str | None = Field(None, description="API key for the SGLang API (optional for local instances).") + default_prompt_formatter: str | None = Field(default=None, description="Default prompt formatter (aka model wrapper).") + handle_base: str | None = Field(None, description="Custom handle base name for model handles.") async def list_llm_models_async(self) -> list[LLMConfig]: from letta.llm_api.openai import openai_get_model_list_async diff --git a/letta/services/mcp/sse_client.py b/letta/services/mcp/sse_client.py index ee8dfc17..1c4660df 100644 --- a/letta/services/mcp/sse_client.py +++ b/letta/services/mcp/sse_client.py @@ -37,7 +37,9 @@ class AsyncSSEMCPClient(AsyncBaseMCPClient): # Pass timeout to prevent httpx.ReadTimeout errors on slow connections timeout = tool_settings.mcp_connect_to_server_timeout if self.oauth_provider: - sse_cm = sse_client(url=server_config.server_url, headers=headers if headers else None, auth=self.oauth_provider, timeout=timeout) + sse_cm = sse_client( + url=server_config.server_url, headers=headers if headers else None, auth=self.oauth_provider, timeout=timeout + ) else: sse_cm = sse_client(url=server_config.server_url, headers=headers if headers else None, timeout=timeout) diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 32408d47..85fca482 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -149,7 +149,6 @@ class StreamingService: client_tools=request.client_tools, include_compaction_messages=request.include_compaction_messages, ) - # handle background streaming if requested if request.background and settings.track_agent_run: diff --git a/tests/integration_test_summarizer.py b/tests/integration_test_summarizer.py index df0bf4c0..22a98995 100644 --- a/tests/integration_test_summarizer.py +++ b/tests/integration_test_summarizer.py @@ -1157,7 +1157,7 @@ async def test_sliding_window_cutoff_index_does_not_exceed_message_count(server: summary, remaining_messages = await summarize_via_sliding_window( actor=actor, llm_config=llm_config, - agent_llm_config=llm_config, # case where agent and summarizer have same config + agent_llm_config=llm_config, # case where agent and summarizer have same config summarizer_config=summarizer_config, in_context_messages=messages, ) diff --git a/tests/mcp_tests/test_schema_validator.py b/tests/mcp_tests/test_schema_validator.py index c9dd66b3..80d93ab2 100644 --- a/tests/mcp_tests/test_schema_validator.py +++ b/tests/mcp_tests/test_schema_validator.py @@ -229,7 +229,6 @@ class TestSchemaValidator: assert status == SchemaHealth.STRICT_COMPLIANT assert reasons == [] - def test_root_level_without_required_non_strict(self): """Test that root-level objects without 'required' field are STRICT_COMPLIANT (validator is relaxed).""" schema = { diff --git a/tests/test_provider_trace_summarization.py b/tests/test_provider_trace_summarization.py index 86b10a5e..c21e1e02 100644 --- a/tests/test_provider_trace_summarization.py +++ b/tests/test_provider_trace_summarization.py @@ -210,7 +210,7 @@ class TestSummarizeSlidingWindowTelemetryContext: await summarizer_sliding_window.summarize_via_sliding_window( actor=mock_actor, llm_config=mock_llm_config, - agent_llm_config=mock_llm_config, # case where agent and summarizer have same config + agent_llm_config=mock_llm_config, # case where agent and summarizer have same config summarizer_config=mock_compaction_settings, in_context_messages=mock_messages, agent_id=agent_id,