From 18029250d04a07c22344f7c6110925dd28176d1c Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Fri, 7 Nov 2025 01:21:26 -0800 Subject: [PATCH] fix(core): sanitize messages to anthropic in the main path the same way (or similar) to how we do it in the token counter (#6044) * fix(core): sanitize messages to anthropic in the main path the same way (or similar) to how we do it in the token counter * fix: also patch poison error in backend by filtering lazily * fix: remap streaming errors (what the fuck) * fix: dedupe tool clals * fix: cleanup, removed try/catch --- letta/adapters/letta_llm_stream_adapter.py | 7 +- letta/adapters/simple_llm_stream_adapter.py | 7 +- letta/llm_api/anthropic_client.py | 114 ++++++++++++++++++ letta/schemas/message.py | 121 ++++++++++++++++++++ 4 files changed, 245 insertions(+), 4 deletions(-) diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index f8408e3e..23800b23 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -79,8 +79,11 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): # Extract optional parameters # ttft_span = kwargs.get('ttft_span', None) - # Start the streaming request - stream = await self.llm_client.stream_async(request_data, self.llm_config) + # Start the streaming request (map provider errors to common LLMError types) + try: + stream = await self.llm_client.stream_async(request_data, self.llm_config) + except Exception as e: + raise self.llm_client.handle_llm_error(e) # Process the stream and yield chunks immediately for TTFT async for chunk in self.interface.process(stream): # TODO: add ttft span diff --git a/letta/adapters/simple_llm_stream_adapter.py b/letta/adapters/simple_llm_stream_adapter.py index ffdd4547..bb8b98f9 100644 --- a/letta/adapters/simple_llm_stream_adapter.py +++ b/letta/adapters/simple_llm_stream_adapter.py @@ -110,8 +110,11 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): # Extract optional parameters # ttft_span = kwargs.get('ttft_span', None) - # Start the streaming request - stream = await self.llm_client.stream_async(request_data, self.llm_config) + # Start the streaming request (map provider errors to common LLMError types) + try: + stream = await self.llm_client.stream_async(request_data, self.llm_config) + except Exception as e: + raise self.llm_client.handle_llm_error(e) # Process the stream and yield chunks immediately for TTFT async for chunk in self.interface.process(stream): # TODO: add ttft span diff --git a/letta/llm_api/anthropic_client.py b/letta/llm_api/anthropic_client.py index 297cf56b..a8c97f97 100644 --- a/letta/llm_api/anthropic_client.py +++ b/letta/llm_api/anthropic_client.py @@ -363,6 +363,11 @@ class AnthropicClient(LLMClientBase): if llm_config.enable_reasoner: data["messages"] = merge_heartbeats_into_tool_responses(data["messages"]) + # Deduplicate tool_result blocks that reference the same tool_use_id within a single user message + # Anthropic requires a single result per tool_use. Merging consecutive user messages can accidentally + # produce multiple tool_result blocks with the same id; consolidate them here. + data["messages"] = dedupe_tool_results_in_user_messages(data["messages"]) + # Prefix fill # https://docs.anthropic.com/en/api/messages#body-messages # NOTE: cannot prefill with tools for opus: @@ -373,6 +378,61 @@ class AnthropicClient(LLMClientBase): {"role": "assistant", "content": f"<{inner_thoughts_xml_tag}>"}, ) + # As a final safeguard for request payloads: drop empty messages (instead of inserting placeholders) + # to avoid changing conversational meaning. Preserve an optional final assistant prefill if present. + if data.get("messages"): + sanitized_messages = [] + dropped_messages = [] + empty_blocks_removed = 0 + total = len(data["messages"]) + for i, msg in enumerate(data["messages"]): + role = msg.get("role") + content = msg.get("content") + is_final_assistant = i == total - 1 and role == "assistant" + + # If content is a list, drop empty text blocks but keep non-text blocks + if isinstance(content, list) and len(content) > 0: + new_blocks = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + if block.get("text", "").strip(): + new_blocks.append(block) + else: + empty_blocks_removed += 1 + else: + new_blocks.append(block) + msg["content"] = new_blocks + content = new_blocks + + # Determine emptiness after trimming blocks + is_empty = ( + content is None + or (isinstance(content, str) and not content.strip()) + or (isinstance(content, list) and len(content) == 0) + ) + + # Drop empty messages except an allowed final assistant prefill + if is_empty and not is_final_assistant: + dropped_messages.append({"index": i, "role": role}) + continue + sanitized_messages.append(msg) + + data["messages"] = sanitized_messages + + # Log unexpected sanitation events for visibility + if dropped_messages or empty_blocks_removed > 0: + logger.error( + "[Anthropic] Sanitized request messages: dropped=%s, empty_text_blocks_removed=%s, model=%s", + dropped_messages, + empty_blocks_removed, + data.get("model"), + ) + + # Ensure first message is user after sanitation + if not data["messages"] or data["messages"][0].get("role") != "user": + logger.error("[Anthropic] Inserting dummy first user message after sanitation to satisfy API constraints") + data["messages"] = [{"role": "user", "content": DUMMY_FIRST_USER_MESSAGE}] + data["messages"] + return data async def count_tokens(self, messages: List[dict] = None, model: str = None, tools: List[OpenAITool] = None) -> int: @@ -1048,6 +1108,60 @@ def merge_tool_results_into_user_messages(messages: List[dict]): return merged_messages +def dedupe_tool_results_in_user_messages(messages: List[dict]) -> List[dict]: + """Ensure each tool_use has a single tool_result within a user message. + + If multiple tool_result blocks with the same tool_use_id appear in the same user message + (e.g., after merging consecutive user messages), merge their content and keep only one block. + """ + any_deduped = False + dedup_counts: dict[str, int] = {} + + for msg in messages: + if not isinstance(msg, dict): + continue + if msg.get("role") != "user": + continue + content = msg.get("content") + if not isinstance(content, list) or len(content) == 0: + continue + + seen: dict[str, dict] = {} + new_content: list = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "tool_result" and "tool_use_id" in block: + tid = block.get("tool_use_id") + if tid in seen: + # Merge duplicate tool_result into the first occurrence + first = seen[tid] + extra = block.get("content") + if extra: + if isinstance(first.get("content"), str) and isinstance(extra, str): + sep = "\n" if first["content"] and extra else "" + first["content"] = f"{first['content']}{sep}{extra}" + else: + # Fallback: coerce to strings and concat + first["content"] = f"{first.get('content')}{'\n' if first.get('content') else ''}{extra}" + any_deduped = True + dedup_counts[tid] = dedup_counts.get(tid, 0) + 1 + # Skip appending duplicate + continue + else: + new_content.append(block) + seen[tid] = block + else: + new_content.append(block) + + # Replace content if we pruned/merged duplicates + if len(new_content) != len(content): + msg["content"] = new_content + + if any_deduped: + logger.error("[Anthropic] Deduped tool_result blocks in user messages: %s", dedup_counts) + + return messages + + def remap_finish_reason(stop_reason: str) -> str: """Remap Anthropic's 'stop_reason' to OpenAI 'finish_reason' diff --git a/letta/schemas/message.py b/letta/schemas/message.py index f80ba8b0..d12858a7 100644 --- a/letta/schemas/message.py +++ b/letta/schemas/message.py @@ -1967,6 +1967,14 @@ class Message(BaseMessage): # Collapse adjacent tool call and approval messages messages = Message.collapse_tool_call_messages_for_llm_api(messages) + # Dedupe duplicate tool-return payloads across tool messages so downstream providers + # never see the same tool_call_id's result twice in a single request + messages = Message.dedupe_tool_messages_for_llm_api(messages) + + # Dedupe duplicate tool calls within assistant messages so a single assistant message + # cannot emit multiple tool_use blocks with the same id (Anthropic requirement) + messages = Message.dedupe_tool_calls_for_llm_api(messages) + return messages @staticmethod @@ -1988,6 +1996,119 @@ class Message(BaseMessage): messages.remove(messages[i + 1]) return messages + @staticmethod + def dedupe_tool_messages_for_llm_api(messages: List[Message]) -> List[Message]: + """Dedupe duplicate tool returns across tool-role messages by tool_call_id. + + - For explicit tool_returns arrays: keep the first occurrence of each tool_call_id, + drop subsequent duplicates within the request. + - For legacy single tool_call_id + content messages: keep the first, drop duplicates. + - If a tool message has neither unique tool_returns nor content, drop it. + + This runs prior to provider-specific formatting to reduce duplicate tool_result blocks downstream. + """ + if not messages: + return messages + + from letta.log import get_logger + + logger = get_logger(__name__) + + seen_ids: set[str] = set() + removed_tool_msgs = 0 + removed_tool_returns = 0 + result: List[Message] = [] + + for m in messages: + if m.role != MessageRole.tool: + result.append(m) + continue + + # Prefer explicit tool_returns when present + if m.tool_returns and len(m.tool_returns) > 0: + unique_returns = [] + for tr in m.tool_returns: + tcid = getattr(tr, "tool_call_id", None) + if tcid and tcid in seen_ids: + removed_tool_returns += 1 + continue + if tcid: + seen_ids.add(tcid) + unique_returns.append(tr) + + if unique_returns: + # Replace with unique set; keep message + m.tool_returns = unique_returns + result.append(m) + else: + # No unique returns left; if legacy content exists, fall back to legacy handling below + if m.tool_call_id and m.content and len(m.content) > 0: + tcid = m.tool_call_id + if tcid in seen_ids: + removed_tool_msgs += 1 + continue + seen_ids.add(tcid) + result.append(m) + else: + removed_tool_msgs += 1 + continue + + else: + # Legacy single-response path + tcid = getattr(m, "tool_call_id", None) + if tcid: + if tcid in seen_ids: + removed_tool_msgs += 1 + continue + seen_ids.add(tcid) + result.append(m) + + if removed_tool_msgs or removed_tool_returns: + logger.error( + "[Message] Deduped duplicate tool messages for request: removed_messages=%d, removed_returns=%d", + removed_tool_msgs, + removed_tool_returns, + ) + + return result + + @staticmethod + def dedupe_tool_calls_for_llm_api(messages: List[Message]) -> List[Message]: + """Ensure each assistant message contains unique tool_calls by id. + + Anthropic requires tool_use ids to be unique within a single assistant message. When + collapsing adjacent assistant/approval messages, duplicates can sneak in. This pass keeps + the first occurrence per id and drops subsequent duplicates. + """ + if not messages: + return messages + + from letta.log import get_logger + + logger = get_logger(__name__) + + removed_counts_total = 0 + for m in messages: + if m.role != MessageRole.assistant or not m.tool_calls: + continue + seen: set[str] = set() + unique_tool_calls = [] + removed = 0 + for tc in m.tool_calls: + tcid = getattr(tc, "id", None) + if tcid and tcid in seen: + removed += 1 + continue + if tcid: + seen.add(tcid) + unique_tool_calls.append(tc) + if removed: + m.tool_calls = unique_tool_calls + removed_counts_total += removed + if removed_counts_total: + logger.error("[Message] Deduped duplicate tool_calls in assistant messages: removed=%d", removed_counts_total) + return messages + @staticmethod def generate_otid_from_id(message_id: str, index: int) -> str: """