fix(core): sanitize messages to anthropic in the main path the same way (or similar) to how we do it in the token counter (#6044)

* fix(core): sanitize messages to anthropic in the main path the same way (or similar) to how we do it in the token counter

* fix: also patch poison error in backend by filtering lazily

* fix: remap streaming errors (what the fuck)

* fix: dedupe tool clals

* fix: cleanup, removed try/catch
This commit is contained in:
Charles Packer
2025-11-07 01:21:26 -08:00
committed by Caren Thomas
parent 363a5c1f92
commit 18029250d0
4 changed files with 245 additions and 4 deletions

View File

@@ -363,6 +363,11 @@ class AnthropicClient(LLMClientBase):
if llm_config.enable_reasoner:
data["messages"] = merge_heartbeats_into_tool_responses(data["messages"])
# Deduplicate tool_result blocks that reference the same tool_use_id within a single user message
# Anthropic requires a single result per tool_use. Merging consecutive user messages can accidentally
# produce multiple tool_result blocks with the same id; consolidate them here.
data["messages"] = dedupe_tool_results_in_user_messages(data["messages"])
# Prefix fill
# https://docs.anthropic.com/en/api/messages#body-messages
# NOTE: cannot prefill with tools for opus:
@@ -373,6 +378,61 @@ class AnthropicClient(LLMClientBase):
{"role": "assistant", "content": f"<{inner_thoughts_xml_tag}>"},
)
# As a final safeguard for request payloads: drop empty messages (instead of inserting placeholders)
# to avoid changing conversational meaning. Preserve an optional final assistant prefill if present.
if data.get("messages"):
sanitized_messages = []
dropped_messages = []
empty_blocks_removed = 0
total = len(data["messages"])
for i, msg in enumerate(data["messages"]):
role = msg.get("role")
content = msg.get("content")
is_final_assistant = i == total - 1 and role == "assistant"
# If content is a list, drop empty text blocks but keep non-text blocks
if isinstance(content, list) and len(content) > 0:
new_blocks = []
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
if block.get("text", "").strip():
new_blocks.append(block)
else:
empty_blocks_removed += 1
else:
new_blocks.append(block)
msg["content"] = new_blocks
content = new_blocks
# Determine emptiness after trimming blocks
is_empty = (
content is None
or (isinstance(content, str) and not content.strip())
or (isinstance(content, list) and len(content) == 0)
)
# Drop empty messages except an allowed final assistant prefill
if is_empty and not is_final_assistant:
dropped_messages.append({"index": i, "role": role})
continue
sanitized_messages.append(msg)
data["messages"] = sanitized_messages
# Log unexpected sanitation events for visibility
if dropped_messages or empty_blocks_removed > 0:
logger.error(
"[Anthropic] Sanitized request messages: dropped=%s, empty_text_blocks_removed=%s, model=%s",
dropped_messages,
empty_blocks_removed,
data.get("model"),
)
# Ensure first message is user after sanitation
if not data["messages"] or data["messages"][0].get("role") != "user":
logger.error("[Anthropic] Inserting dummy first user message after sanitation to satisfy API constraints")
data["messages"] = [{"role": "user", "content": DUMMY_FIRST_USER_MESSAGE}] + data["messages"]
return data
async def count_tokens(self, messages: List[dict] = None, model: str = None, tools: List[OpenAITool] = None) -> int:
@@ -1048,6 +1108,60 @@ def merge_tool_results_into_user_messages(messages: List[dict]):
return merged_messages
def dedupe_tool_results_in_user_messages(messages: List[dict]) -> List[dict]:
"""Ensure each tool_use has a single tool_result within a user message.
If multiple tool_result blocks with the same tool_use_id appear in the same user message
(e.g., after merging consecutive user messages), merge their content and keep only one block.
"""
any_deduped = False
dedup_counts: dict[str, int] = {}
for msg in messages:
if not isinstance(msg, dict):
continue
if msg.get("role") != "user":
continue
content = msg.get("content")
if not isinstance(content, list) or len(content) == 0:
continue
seen: dict[str, dict] = {}
new_content: list = []
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result" and "tool_use_id" in block:
tid = block.get("tool_use_id")
if tid in seen:
# Merge duplicate tool_result into the first occurrence
first = seen[tid]
extra = block.get("content")
if extra:
if isinstance(first.get("content"), str) and isinstance(extra, str):
sep = "\n" if first["content"] and extra else ""
first["content"] = f"{first['content']}{sep}{extra}"
else:
# Fallback: coerce to strings and concat
first["content"] = f"{first.get('content')}{'\n' if first.get('content') else ''}{extra}"
any_deduped = True
dedup_counts[tid] = dedup_counts.get(tid, 0) + 1
# Skip appending duplicate
continue
else:
new_content.append(block)
seen[tid] = block
else:
new_content.append(block)
# Replace content if we pruned/merged duplicates
if len(new_content) != len(content):
msg["content"] = new_content
if any_deduped:
logger.error("[Anthropic] Deduped tool_result blocks in user messages: %s", dedup_counts)
return messages
def remap_finish_reason(stop_reason: str) -> str:
"""Remap Anthropic's 'stop_reason' to OpenAI 'finish_reason'