import copy import json import logging import re from typing import Dict, List, Optional, Union import anthropic import httpx from anthropic import AsyncStream from anthropic.types.beta import BetaMessage as AnthropicMessage, BetaRawMessageStreamEvent from anthropic.types.beta.message_create_params import MessageCreateParamsNonStreaming from anthropic.types.beta.messages import BetaMessageBatch from anthropic.types.beta.messages.batch_create_params import Request from letta.constants import FUNC_FAILED_HEARTBEAT_MESSAGE, REQ_HEARTBEAT_MESSAGE, REQUEST_HEARTBEAT_PARAM from letta.errors import ( ContextWindowExceededError, ErrorCode, LLMAuthenticationError, LLMBadRequestError, LLMConnectionError, LLMInsufficientCreditsError, LLMNotFoundError, LLMPermissionDeniedError, LLMProviderOverloaded, LLMRateLimitError, LLMServerError, LLMTimeoutError, LLMUnprocessableEntityError, ) from letta.helpers.datetime_helpers import get_utc_time_int from letta.helpers.decorators import deprecated from letta.helpers.json_helpers import sanitize_unicode_surrogates from letta.llm_api.anthropic_constants import ANTHROPIC_MAX_STRICT_TOOLS, ANTHROPIC_STRICT_MODE_ALLOWLIST from letta.llm_api.error_utils import is_insufficient_credits_message from letta.llm_api.helpers import add_inner_thoughts_to_functions, unpack_all_inner_thoughts_from_kwargs from letta.llm_api.llm_client_base import LLMClientBase from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION from letta.log import get_logger from letta.otel.tracing import trace_method from letta.schemas.agent import AgentType from letta.schemas.enums import ProviderCategory from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message as PydanticMessage from letta.schemas.openai.chat_completion_request import Tool as OpenAITool from letta.schemas.openai.chat_completion_response import ( ChatCompletionResponse, Choice, FunctionCall, Message as ChoiceMessage, ToolCall, ) from letta.schemas.usage import LettaUsageStatistics from letta.settings import model_settings DUMMY_FIRST_USER_MESSAGE = "User initializing bootup sequence." logger = get_logger(__name__) class AnthropicClient(LLMClientBase): @trace_method @deprecated("Synchronous version of this is no longer valid. Will result in model_dump of coroutine") def request(self, request_data: dict, llm_config: LLMConfig) -> dict: client = self._get_anthropic_client(llm_config, async_client=False) betas: list[str] = [] # Opus 4.6 / Sonnet 4.6 Auto Thinking if llm_config.enable_reasoner: if llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6"): betas.append("adaptive-thinking-2026-01-28") # Interleaved thinking for other reasoners (sync path parity) else: betas.append("interleaved-thinking-2025-05-14") # 1M context beta for Sonnet 4/4.5 or Opus 4.6 when enabled try: from letta.settings import model_settings if model_settings.anthropic_sonnet_1m and ( llm_config.model.startswith("claude-sonnet-4") or llm_config.model.startswith("claude-sonnet-4-5") ): betas.append("context-1m-2025-08-07") elif model_settings.anthropic_opus_1m and llm_config.model.startswith("claude-opus-4-6"): betas.append("context-1m-2025-08-07") except Exception: pass # Effort parameter for Opus 4.5, Opus 4.6, and Sonnet 4.6 - to extend to other models, modify the model check if ( llm_config.model.startswith("claude-opus-4-5") or llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6") ) and llm_config.effort is not None: betas.append("effort-2025-11-24") # Max effort beta for Opus 4.6 / Sonnet 4.6 if ( llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6") ) and llm_config.effort == "max": betas.append("max-effort-2026-01-24") # Context management for Opus 4.5 to preserve thinking blocks (improves cache hits) if llm_config.model.startswith("claude-opus-4-5") and llm_config.enable_reasoner: betas.append("context-management-2025-06-27") # Structured outputs beta - only when strict is enabled and model supports it if llm_config.strict and _supports_structured_outputs(llm_config.model): betas.append("structured-outputs-2025-11-13") try: if betas: response = client.beta.messages.create(**request_data, betas=betas) else: response = client.beta.messages.create(**request_data) return response.model_dump() except ValueError as e: # Anthropic SDK raises ValueError when streaming is required for long-running operations # See: https://github.com/anthropics/anthropic-sdk-python#streaming if "streaming is required" in str(e).lower(): logger.warning( "[Anthropic] Non-streaming request rejected due to potential long duration. Error: %s. " "Note: Synchronous fallback to streaming is not supported. Use async API instead.", str(e), ) # Re-raise as LLMBadRequestError (maps to 502 Bad Gateway) since this is a downstream provider constraint raise LLMBadRequestError( message="This operation may take longer than 10 minutes and requires streaming. " "Please use the async API (request_async) instead of the deprecated sync API. " f"Original error: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, ) from e raise @trace_method async def request_async(self, request_data: dict, llm_config: LLMConfig) -> dict: request_data = sanitize_unicode_surrogates(request_data) client = await self._get_anthropic_client_async(llm_config, async_client=True) betas: list[str] = [] # Opus 4.6 / Sonnet 4.6 Auto Thinking if llm_config.enable_reasoner: if llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6"): betas.append("adaptive-thinking-2026-01-28") # Interleaved thinking for other reasoners (sync path parity) else: betas.append("interleaved-thinking-2025-05-14") # 1M context beta for Sonnet 4/4.5 or Opus 4.6 when enabled try: from letta.settings import model_settings if model_settings.anthropic_sonnet_1m and ( llm_config.model.startswith("claude-sonnet-4") or llm_config.model.startswith("claude-sonnet-4-5") ): betas.append("context-1m-2025-08-07") elif model_settings.anthropic_opus_1m and llm_config.model.startswith("claude-opus-4-6"): betas.append("context-1m-2025-08-07") except Exception: pass # Effort parameter for Opus 4.5, Opus 4.6, and Sonnet 4.6 - to extend to other models, modify the model check if ( llm_config.model.startswith("claude-opus-4-5") or llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6") ) and llm_config.effort is not None: betas.append("effort-2025-11-24") # Max effort beta for Opus 4.6 / Sonnet 4.6 if ( llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6") ) and llm_config.effort == "max": betas.append("max-effort-2026-01-24") # Context management for Opus 4.5 to preserve thinking blocks (improves cache hits) if llm_config.model.startswith("claude-opus-4-5") and llm_config.enable_reasoner: betas.append("context-management-2025-06-27") # Structured outputs beta - only when strict is enabled and model supports it if llm_config.strict and _supports_structured_outputs(llm_config.model): betas.append("structured-outputs-2025-11-13") try: if betas: response = await client.beta.messages.create(**request_data, betas=betas) else: response = await client.beta.messages.create(**request_data) return response.model_dump() except ValueError as e: # Anthropic SDK raises ValueError when streaming is required for long-running operations # See: https://github.com/anthropics/anthropic-sdk-python#long-requests if "streaming is required" in str(e).lower(): logger.warning( "[Anthropic] Non-streaming request rejected due to potential long duration. Falling back to streaming mode. Error: %s", str(e), ) return await self._request_via_streaming(request_data, llm_config, betas) raise @trace_method async def _request_via_streaming(self, request_data: dict, llm_config: LLMConfig, betas: list[str]) -> dict: """ Fallback method that uses streaming to handle long-running requests. When Anthropic SDK detects a request may exceed 10 minutes, it requires streaming. This method streams the response and accumulates it into the same dict format as the non-streaming response. See: https://github.com/anthropics/anthropic-sdk-python#long-requests """ from letta.interfaces.anthropic_parallel_tool_call_streaming_interface import ( SimpleAnthropicStreamingInterface, ) interface = SimpleAnthropicStreamingInterface( requires_approval_tools=[], run_id=None, step_id=None, ) # Get the streaming response stream = await self.stream_async(request_data, llm_config) # Process the stream to accumulate the response async for _chunk in interface.process(stream): # We don't emit anything; we just want the fully-accumulated content pass # Reconstruct the response dict in the same format as non-streaming # Build content array from accumulated data content = [] # Add reasoning content (thinking blocks) reasoning_parts = interface.get_reasoning_content() for part in reasoning_parts: if hasattr(part, "reasoning") and part.reasoning: # Native thinking block content.append( { "type": "thinking", "thinking": part.reasoning, "signature": getattr(part, "signature", None), } ) elif hasattr(part, "data") and part.data: # Redacted thinking block content.append( { "type": "redacted_thinking", "data": part.data, } ) elif hasattr(part, "text") and part.text: # Text content (non-native reasoning) content.append( { "type": "text", "text": part.text, } ) # Add tool use if present tool_call = interface.get_tool_call_object() if tool_call: try: tool_input = json.loads(tool_call.function.arguments) except json.JSONDecodeError: tool_input = {} content.append( { "type": "tool_use", "id": tool_call.id, "name": tool_call.function.name, "input": tool_input, } ) # Calculate total input tokens (Anthropic reports input_tokens as non-cached only) # We need to add cache tokens if they're available input_tokens = interface.input_tokens or 0 cache_read_tokens = getattr(interface, "cache_read_tokens", 0) or 0 cache_creation_tokens = getattr(interface, "cache_creation_tokens", 0) or 0 # Build the response dict response_dict = { "id": interface.message_id or "msg_streaming_fallback", "type": "message", "role": "assistant", "content": content, "model": interface.model or llm_config.model, "stop_reason": "tool_use" if tool_call else "end_turn", "stop_sequence": None, "usage": { "input_tokens": input_tokens, "output_tokens": interface.output_tokens or 0, "cache_read_input_tokens": cache_read_tokens, "cache_creation_input_tokens": cache_creation_tokens, }, } logger.info( "[Anthropic] Streaming fallback completed successfully. Message ID: %s, Input tokens: %d, Output tokens: %d", response_dict["id"], response_dict["usage"]["input_tokens"], response_dict["usage"]["output_tokens"], ) return response_dict @trace_method async def stream_async(self, request_data: dict, llm_config: LLMConfig) -> AsyncStream[BetaRawMessageStreamEvent]: request_data = sanitize_unicode_surrogates(request_data) client = await self._get_anthropic_client_async(llm_config, async_client=True) request_data["stream"] = True # Add fine-grained tool streaming beta header for better streaming performance # This helps reduce buffering when streaming tool call parameters # See: https://docs.anthropic.com/en/docs/build-with-claude/tool-use/fine-grained-streaming betas = ["fine-grained-tool-streaming-2025-05-14"] # Opus 4.6 / Sonnet 4.6 Auto Thinking if llm_config.enable_reasoner: if llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6"): betas.append("adaptive-thinking-2026-01-28") # Interleaved thinking for other reasoners (sync path parity) else: betas.append("interleaved-thinking-2025-05-14") # 1M context beta for Sonnet 4/4.5 or Opus 4.6 when enabled try: from letta.settings import model_settings if model_settings.anthropic_sonnet_1m and ( llm_config.model.startswith("claude-sonnet-4") or llm_config.model.startswith("claude-sonnet-4-5") ): betas.append("context-1m-2025-08-07") elif model_settings.anthropic_opus_1m and llm_config.model.startswith("claude-opus-4-6"): betas.append("context-1m-2025-08-07") except Exception: pass # Effort parameter for Opus 4.5, Opus 4.6, and Sonnet 4.6 - to extend to other models, modify the model check if ( llm_config.model.startswith("claude-opus-4-5") or llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6") ) and llm_config.effort is not None: betas.append("effort-2025-11-24") # Max effort beta for Opus 4.6 / Sonnet 4.6 if ( llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6") ) and llm_config.effort == "max": betas.append("max-effort-2026-01-24") # Context management for Opus 4.5 to preserve thinking blocks (improves cache hits) if llm_config.model.startswith("claude-opus-4-5") and llm_config.enable_reasoner: betas.append("context-management-2025-06-27") # Structured outputs beta - only when strict is enabled and model supports it if llm_config.strict and _supports_structured_outputs(llm_config.model): betas.append("structured-outputs-2025-11-13") # log failed requests try: return await client.beta.messages.create(**request_data, betas=betas) except Exception as e: logger.error(f"Error streaming Anthropic request: {e} with request data: {json.dumps(request_data)}") raise e @trace_method async def send_llm_batch_request_async( self, agent_type: AgentType, agent_messages_mapping: Dict[str, List[PydanticMessage]], agent_tools_mapping: Dict[str, List[dict]], agent_llm_config_mapping: Dict[str, LLMConfig], ) -> BetaMessageBatch: """ Sends a batch request to the Anthropic API using the provided agent messages and tools mappings. Args: agent_messages_mapping: A dict mapping agent_id to their list of PydanticMessages. agent_tools_mapping: A dict mapping agent_id to their list of tool dicts. agent_llm_config_mapping: A dict mapping agent_id to their LLM config Returns: BetaMessageBatch: The batch response from the Anthropic API. Raises: ValueError: If the sets of agent_ids in the two mappings do not match. Exception: Transformed errors from the underlying API call. """ # Validate that both mappings use the same set of agent_ids. if set(agent_messages_mapping.keys()) != set(agent_tools_mapping.keys()): raise ValueError("Agent mappings for messages and tools must use the same agent_ids.") try: requests = { agent_id: self.build_request_data( agent_type=agent_type, messages=agent_messages_mapping[agent_id], llm_config=agent_llm_config_mapping[agent_id], tools=agent_tools_mapping[agent_id], ) for agent_id in agent_messages_mapping } client = await self._get_anthropic_client_async(list(agent_llm_config_mapping.values())[0], async_client=True) anthropic_requests = [ Request(custom_id=agent_id, params=MessageCreateParamsNonStreaming(**params)) for agent_id, params in requests.items() ] batch_response = await client.beta.messages.batches.create(requests=anthropic_requests) return batch_response except Exception as e: # Enhance logging here if additional context is needed logger.error("Error during send_llm_batch_request_async.", exc_info=True) raise self.handle_llm_error(e) @trace_method def _get_anthropic_client( self, llm_config: LLMConfig, async_client: bool = False ) -> Union[anthropic.AsyncAnthropic, anthropic.Anthropic]: api_key, _, _ = self.get_byok_overrides(llm_config) # For claude-pro-max provider, use OAuth Bearer token instead of api_key is_oauth_provider = llm_config.provider_name == "claude-pro-max" if async_client: if api_key: if is_oauth_provider: return anthropic.AsyncAnthropic( max_retries=model_settings.anthropic_max_retries, default_headers={ "Authorization": f"Bearer {api_key}", "anthropic-version": "2023-06-01", "anthropic-beta": "oauth-2025-04-20", }, ) return anthropic.AsyncAnthropic(api_key=api_key, max_retries=model_settings.anthropic_max_retries) return anthropic.AsyncAnthropic(max_retries=model_settings.anthropic_max_retries) if api_key: if is_oauth_provider: return anthropic.Anthropic( max_retries=model_settings.anthropic_max_retries, default_headers={ "Authorization": f"Bearer {api_key}", "anthropic-version": "2023-06-01", "anthropic-beta": "oauth-2025-04-20", }, ) return anthropic.Anthropic(api_key=api_key, max_retries=model_settings.anthropic_max_retries) return anthropic.Anthropic(max_retries=model_settings.anthropic_max_retries) @trace_method async def _get_anthropic_client_async( self, llm_config: LLMConfig, async_client: bool = False ) -> Union[anthropic.AsyncAnthropic, anthropic.Anthropic]: api_key, _, _ = await self.get_byok_overrides_async(llm_config) # For claude-pro-max provider, use OAuth Bearer token instead of api_key is_oauth_provider = llm_config.provider_name == "claude-pro-max" if async_client: if api_key: if is_oauth_provider: return anthropic.AsyncAnthropic( max_retries=model_settings.anthropic_max_retries, default_headers={ "Authorization": f"Bearer {api_key}", "anthropic-version": "2023-06-01", "anthropic-beta": "oauth-2025-04-20", }, ) return anthropic.AsyncAnthropic(api_key=api_key, max_retries=model_settings.anthropic_max_retries) return anthropic.AsyncAnthropic(max_retries=model_settings.anthropic_max_retries) if api_key: if is_oauth_provider: return anthropic.Anthropic( max_retries=model_settings.anthropic_max_retries, default_headers={ "Authorization": f"Bearer {api_key}", "anthropic-version": "2023-06-01", "anthropic-beta": "oauth-2025-04-20", }, ) return anthropic.Anthropic(api_key=api_key, max_retries=model_settings.anthropic_max_retries) return anthropic.Anthropic(max_retries=model_settings.anthropic_max_retries) @trace_method def build_request_data( self, agent_type: AgentType, # if react, use native content + strip heartbeats messages: List[PydanticMessage], llm_config: LLMConfig, tools: Optional[List[dict]] = None, force_tool_call: Optional[str] = None, requires_subsequent_tool_call: bool = False, tool_return_truncation_chars: Optional[int] = None, ) -> dict: # TODO: This needs to get cleaned up. The logic here is pretty confusing. # TODO: I really want to get rid of prefixing, it's a recipe for disaster code maintenance wise prefix_fill = True if agent_type != AgentType.letta_v1_agent else False is_v1 = agent_type == AgentType.letta_v1_agent # Determine local behavior for putting inner thoughts in kwargs without mutating llm_config put_kwargs = bool(llm_config.put_inner_thoughts_in_kwargs) and not is_v1 if not self.use_tool_naming: raise NotImplementedError("Only tool calling supported on Anthropic API requests") if not llm_config.max_tokens: # TODO strip this default once we add provider-specific defaults max_output_tokens = 4096 # the minimum max tokens (for Haiku 3) else: max_output_tokens = llm_config.max_tokens # Strip provider prefix from model name if present (e.g., "anthropic/claude-..." -> "claude-...") # This handles cases where the handle format was incorrectly passed as the model name model_name = llm_config.model if "/" in model_name: model_name = model_name.split("/", 1)[-1] data = { "model": model_name, "max_tokens": max_output_tokens, "temperature": llm_config.temperature, } # Extended Thinking # Note: Anthropic does not allow thinking when forcing tool use with split_thread_agent should_enable_thinking = ( self.is_reasoning_model(llm_config) and llm_config.enable_reasoner and not (agent_type == AgentType.split_thread_agent and force_tool_call is not None) ) if should_enable_thinking: # Opus 4.6 / Sonnet 4.6 uses Auto Thinking (no budget tokens) if llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6"): data["thinking"] = { "type": "adaptive", } else: # Traditional extended thinking with budget tokens thinking_budget = max(llm_config.max_reasoning_tokens, 1024) if thinking_budget != llm_config.max_reasoning_tokens: logger.warning( f"Max reasoning tokens must be at least 1024 for Claude. Setting max_reasoning_tokens to 1024 for model {llm_config.model}." ) data["thinking"] = { "type": "enabled", "budget_tokens": thinking_budget, } # `temperature` may only be set to 1 when thinking is enabled. Please consult our documentation at https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking#important-considerations-when-using-extended-thinking' data["temperature"] = 1.0 # Silently disable prefix_fill for now prefix_fill = False # Effort configuration for Opus 4.5, Opus 4.6, and Sonnet 4.6 (controls token spending) # To extend to other models, modify the model check if ( llm_config.model.startswith("claude-opus-4-5") or llm_config.model.startswith("claude-opus-4-6") or llm_config.model.startswith("claude-sonnet-4-6") ) and llm_config.effort is not None: data["output_config"] = {"effort": llm_config.effort} # Context management for Opus 4.5 to preserve thinking blocks and improve cache hits # See: https://docs.anthropic.com/en/docs/build-with-claude/context-editing if llm_config.model.startswith("claude-opus-4-5") and llm_config.enable_reasoner: data["context_management"] = { "edits": [ { "type": "clear_thinking_20251015", "keep": "all", # Preserve all thinking blocks for maximum cache performance } ] } # Structured outputs via response_format # DISABLED: Commenting out structured outputs to investigate TTFT latency impact # See PR #7495 for original implementation # if hasattr(llm_config, "response_format") and isinstance(llm_config.response_format, JsonSchemaResponseFormat): # data["output_format"] = { # "type": "json_schema", # "schema": llm_config.response_format.json_schema["schema"], # } # Tools # For an overview on tool choice: # https://docs.anthropic.com/en/docs/build-with-claude/tool-use/overview if not tools: # Special case for summarization path tools_for_request = None tool_choice = None elif self.is_reasoning_model(llm_config) and llm_config.enable_reasoner or agent_type == AgentType.letta_v1_agent: # NOTE: reasoning models currently do not allow for `any` # NOTE: react agents should always have at least auto on, since the precense/absense of tool calls controls chaining if agent_type == AgentType.split_thread_agent and force_tool_call is not None: tool_choice = {"type": "tool", "name": force_tool_call, "disable_parallel_tool_use": True} # When forcing a specific tool, only include that tool tools_for_request = [OpenAITool(function=f) for f in tools if f["name"] == force_tool_call] else: tool_choice = {"type": "auto", "disable_parallel_tool_use": True} tools_for_request = [OpenAITool(function=f) for f in tools] elif force_tool_call is not None: tool_choice = {"type": "tool", "name": force_tool_call, "disable_parallel_tool_use": True} tools_for_request = [OpenAITool(function=f) for f in tools if f["name"] == force_tool_call] # need to have this setting to be able to put inner thoughts in kwargs if not put_kwargs: if is_v1: # For v1 agents, native content is used and kwargs must remain disabled to avoid conflicts logger.warning( "Forced tool call requested but inner_thoughts_in_kwargs is disabled for v1 agent; proceeding without inner thoughts in kwargs." ) else: logger.warning( f"Force enabling inner thoughts in kwargs for Claude due to forced tool call: {force_tool_call} (local override only)" ) put_kwargs = True else: tool_choice = {"type": "any", "disable_parallel_tool_use": True} tools_for_request = [OpenAITool(function=f) for f in tools] if tools is not None else None # Add tool choice if tool_choice: data["tool_choice"] = tool_choice # Add inner thoughts kwarg # TODO: Can probably make this more efficient if tools_for_request and len(tools_for_request) > 0 and put_kwargs: tools_with_inner_thoughts = add_inner_thoughts_to_functions( functions=[t.function.model_dump() for t in tools_for_request], inner_thoughts_key=INNER_THOUGHTS_KWARG, inner_thoughts_description=INNER_THOUGHTS_KWARG_DESCRIPTION, ) tools_for_request = [OpenAITool(function=f) for f in tools_with_inner_thoughts] if tools_for_request and len(tools_for_request) > 0: # TODO eventually enable parallel tool use # Enable strict mode when strict is enabled and model supports it use_strict = llm_config.strict and _supports_structured_outputs(llm_config.model) data["tools"] = convert_tools_to_anthropic_format( tools_for_request, use_strict=use_strict, ) # Add cache control to the last tool for caching tool definitions if len(data["tools"]) > 0: data["tools"][-1]["cache_control"] = {"type": "ephemeral"} # Messages inner_thoughts_xml_tag = "thinking" # Move 'system' to the top level if messages[0].role != "system": raise RuntimeError(f"First message is not a system message, instead has role {messages[0].role}") system_content = messages[0].content if isinstance(messages[0].content, str) else messages[0].content[0].text data["system"] = self._add_cache_control_to_system_message(system_content) data["messages"] = PydanticMessage.to_anthropic_dicts_from_list( messages=messages[1:], current_model=llm_config.model, inner_thoughts_xml_tag=inner_thoughts_xml_tag, put_inner_thoughts_in_kwargs=put_kwargs, # if react, use native content + strip heartbeats native_content=is_v1, strip_request_heartbeat=is_v1, tool_return_truncation_chars=tool_return_truncation_chars, ) # Ensure first message is user if data["messages"][0]["role"] != "user": data["messages"] = [{"role": "user", "content": DUMMY_FIRST_USER_MESSAGE}] + data["messages"] # Handle alternating messages data["messages"] = merge_tool_results_into_user_messages(data["messages"]) if agent_type == AgentType.letta_v1_agent: # Both drop heartbeats in the payload data["messages"] = drop_heartbeats(data["messages"]) # And drop heartbeats in the tools if "tools" in data: for tool in data["tools"]: tool["input_schema"]["properties"].pop(REQUEST_HEARTBEAT_PARAM, None) if "required" in tool["input_schema"] and REQUEST_HEARTBEAT_PARAM in tool["input_schema"]["required"]: # NOTE: required is not always present tool["input_schema"]["required"].remove(REQUEST_HEARTBEAT_PARAM) else: # Strip heartbeat pings if extended thinking if llm_config.enable_reasoner: data["messages"] = merge_heartbeats_into_tool_responses(data["messages"]) # Deduplicate tool_result blocks that reference the same tool_use_id within a single user message # Anthropic requires a single result per tool_use. Merging consecutive user messages can accidentally # produce multiple tool_result blocks with the same id; consolidate them here. data["messages"] = dedupe_tool_results_in_user_messages(data["messages"]) # Add cache control to final message for incremental conversation caching # Per Anthropic docs: "During each turn, we mark the final block of the final message with # cache_control so the conversation can be incrementally cached." data["messages"] = self._add_cache_control_to_messages(data["messages"]) # Debug: Log cache control placement logger.debug(f"Anthropic request has {len(data.get('messages', []))} messages") if data.get("messages") and len(data["messages"]) > 0: last_msg = data["messages"][-1] logger.debug(f"Last message role: {last_msg.get('role')}, content type: {type(last_msg.get('content'))}") if isinstance(last_msg.get("content"), list) and len(last_msg["content"]) > 0: last_block = last_msg["content"][-1] logger.debug(f"Last content block type: {last_block.get('type')}, has cache_control: {'cache_control' in last_block}") if "cache_control" in last_block: logger.debug(f"Cache control value: {last_block['cache_control']}") # Prefix fill # https://docs.anthropic.com/en/api/messages#body-messages # NOTE: cannot prefill with tools for opus: # Your API request included an `assistant` message in the final position, which would pre-fill the `assistant` response. When using tools with "claude-3-opus-20240229" if prefix_fill and not put_kwargs and "opus" not in data["model"]: data["messages"].append( # Start the thinking process for the assistant {"role": "assistant", "content": f"<{inner_thoughts_xml_tag}>"}, ) # As a final safeguard for request payloads: drop empty messages (instead of inserting placeholders) # to avoid changing conversational meaning. Preserve an optional final assistant prefill if present. if data.get("messages"): sanitized_messages = [] dropped_messages = [] empty_blocks_removed = 0 total = len(data["messages"]) for i, msg in enumerate(data["messages"]): role = msg.get("role") content = msg.get("content") is_final_assistant = i == total - 1 and role == "assistant" # If content is a list, drop empty text blocks but keep non-text blocks if isinstance(content, list) and len(content) > 0: new_blocks = [] for block in content: if isinstance(block, dict) and block.get("type") == "text": if block.get("text", "").strip(): new_blocks.append(block) else: empty_blocks_removed += 1 else: new_blocks.append(block) msg["content"] = new_blocks content = new_blocks # Determine emptiness after trimming blocks is_empty = ( content is None or (isinstance(content, str) and not content.strip()) or (isinstance(content, list) and len(content) == 0) ) # Drop empty messages except an allowed final assistant prefill if is_empty and not is_final_assistant: dropped_messages.append({"index": i, "role": role}) continue sanitized_messages.append(msg) data["messages"] = sanitized_messages # Log unexpected sanitation events for visibility if dropped_messages or empty_blocks_removed > 0: logger.error( "[Anthropic] Sanitized request messages: dropped=%s, empty_text_blocks_removed=%s, model=%s", dropped_messages, empty_blocks_removed, data.get("model"), ) # Ensure first message is user after sanitation if not data["messages"] or data["messages"][0].get("role") != "user": logger.error("[Anthropic] Inserting dummy first user message after sanitation to satisfy API constraints") data["messages"] = [{"role": "user", "content": DUMMY_FIRST_USER_MESSAGE}] + data["messages"] return data async def count_tokens(self, messages: List[dict] = None, model: str = None, tools: List[OpenAITool] = None) -> int: logging.getLogger("httpx").setLevel(logging.WARNING) # Use the default client; token counting is lightweight and does not require BYOK overrides client = anthropic.AsyncAnthropic() if messages and len(messages) == 0: messages = None if tools and len(tools) > 0: # Token counting endpoint requires additionalProperties: false (use_strict=True) # but does NOT support the `strict` field on tools (add_strict_field=False) anthropic_tools = convert_tools_to_anthropic_format( tools, use_strict=True, add_strict_field=False, ) else: anthropic_tools = None # Convert final thinking blocks to text to work around token counting endpoint limitation. # The token counting endpoint rejects messages where the final content block is thinking, # even though the main API supports this with the interleaved-thinking beta. # We convert (not strip) to preserve accurate token counts. # TODO: Remove this workaround if Anthropic fixes the token counting endpoint. thinking_enabled = False messages_for_counting = messages if messages and len(messages) > 0: messages_for_counting = copy.deepcopy(messages) # Scan all assistant messages and convert any final thinking blocks to text for message in messages_for_counting: if message.get("role") == "assistant": content = message.get("content") # Check for thinking in any format if isinstance(content, list) and len(content) > 0: # Check if message has any thinking blocks (to enable thinking mode) has_thinking = any( isinstance(part, dict) and part.get("type") in {"thinking", "redacted_thinking"} for part in content ) if has_thinking: thinking_enabled = True # If final block is thinking, handle it last_block = content[-1] if isinstance(last_block, dict) and last_block.get("type") in {"thinking", "redacted_thinking"}: if len(content) == 1: # Thinking-only message: add text at end (don't convert the thinking) # API requires first block to be thinking when thinking is enabled content.append({"type": "text", "text": "."}) else: # Multiple blocks: convert final thinking to text if last_block["type"] == "thinking": content[-1] = {"type": "text", "text": last_block.get("thinking", "")} elif last_block["type"] == "redacted_thinking": content[-1] = {"type": "text", "text": last_block.get("data", "[redacted]")} elif isinstance(content, str) and "" in content: # Handle XML-style thinking in string content thinking_enabled = True # Replace empty content with placeholder (Anthropic requires non-empty content except for final assistant message) if messages_for_counting: for i, msg in enumerate(messages_for_counting): content = msg.get("content") is_final_assistant = i == len(messages_for_counting) - 1 and msg.get("role") == "assistant" # Check if content is empty and needs replacement if content is None: if not is_final_assistant: msg["content"] = "." elif isinstance(content, str) and not content.strip(): if not is_final_assistant: msg["content"] = "." elif isinstance(content, list): if len(content) == 0: # Preserve truly empty list for final assistant message if not is_final_assistant: msg["content"] = [{"type": "text", "text": "."}] else: # Always fix empty text blocks within lists, even for final assistant message # The API exemption is for truly empty content (empty string or empty list), # not for lists with explicit empty text blocks for block in content: if isinstance(block, dict) and block.get("type") == "text": if not block.get("text", "").strip(): block["text"] = "." # Strip trailing whitespace from final assistant message # Anthropic API rejects messages where "final assistant content cannot end with trailing whitespace" if is_final_assistant: if isinstance(content, str): msg["content"] = content.rstrip() elif isinstance(content, list) and len(content) > 0: # Find and strip trailing whitespace from the last text block for block in reversed(content): if isinstance(block, dict) and block.get("type") == "text": block["text"] = block.get("text", "").rstrip() break try: count_params = { "model": model or "claude-3-7-sonnet-20250219", "messages": messages_for_counting or [{"role": "user", "content": "hi"}], "tools": anthropic_tools or [], } betas: list[str] = [] if thinking_enabled: # Match interleaved thinking behavior so token accounting is consistent count_params["thinking"] = {"type": "enabled", "budget_tokens": 16000} betas.append("interleaved-thinking-2025-05-14") # Opt-in to 1M context if enabled for this model in settings try: if ( model and model_settings.anthropic_sonnet_1m and (model.startswith("claude-sonnet-4") or model.startswith("claude-sonnet-4-5")) ): betas.append("context-1m-2025-08-07") elif model and model_settings.anthropic_opus_1m and model.startswith("claude-opus-4-6"): betas.append("context-1m-2025-08-07") except Exception: pass # Opus 4.5 beta flags for effort and context management # Note: effort beta is added if model is kevlar (actual effort value is in count_params) # Context management beta is added for consistency with main requests if model and model.startswith("claude-opus-4-5"): # Add effort beta if output_config is present in count_params if "output_config" in count_params: betas.append("effort-2025-11-24") # Add context management beta if thinking is enabled if thinking_enabled: betas.append("context-management-2025-06-27") # Structured outputs beta - only for supported models # DISABLED: Commenting out structured outputs to investigate TTFT latency impact # See PR #7495 for original implementation # if model and _supports_structured_outputs(model): # betas.append("structured-outputs-2025-11-13") if betas: result = await client.beta.messages.count_tokens(**count_params, betas=betas) else: result = await client.beta.messages.count_tokens(**count_params) except Exception as e: raise self.handle_llm_error(e) token_count = result.input_tokens if messages is None: token_count -= 8 return token_count def is_reasoning_model(self, llm_config: LLMConfig) -> bool: return ( llm_config.model.startswith("claude-3-7-sonnet") or llm_config.model.startswith("claude-sonnet-4") or llm_config.model.startswith("claude-opus-4") or llm_config.model.startswith("claude-haiku-4-5") # Opus 4.5 support - to extend effort parameter to other models, modify this check or llm_config.model.startswith("claude-opus-4-5") # Opus 4.6 support - uses Auto Thinking or llm_config.model.startswith("claude-opus-4-6") # Sonnet 4.6 support - same API as Opus 4.6 or llm_config.model.startswith("claude-sonnet-4-6") ) @trace_method def handle_llm_error(self, e: Exception, llm_config: Optional[LLMConfig] = None) -> Exception: is_byok = (llm_config.provider_category == ProviderCategory.byok) if llm_config else None # make sure to check for overflow errors, regardless of error type error_str = str(e).lower() if ( "prompt is too long" in error_str or "exceed context limit" in error_str or "exceeds context" in error_str or "too many total text bytes" in error_str or "total text bytes" in error_str or "request_too_large" in error_str or "request exceeds the maximum size" in error_str ): logger.warning(f"[Anthropic] Context window exceeded: {str(e)}") return ContextWindowExceededError( message=f"Context window exceeded for Anthropic: {str(e)}", details={"is_byok": is_byok}, ) if isinstance(e, anthropic.APITimeoutError): logger.warning(f"[Anthropic] Request timeout: {e}") return LLMTimeoutError( message=f"Request to Anthropic timed out: {str(e)}", code=ErrorCode.TIMEOUT, details={"cause": str(e.__cause__) if e.__cause__ else None, "is_byok": is_byok}, ) if isinstance(e, anthropic.APIConnectionError): logger.warning(f"[Anthropic] API connection error: {e.__cause__}") return LLMConnectionError( message=f"Failed to connect to Anthropic: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"cause": str(e.__cause__) if e.__cause__ else None, "is_byok": is_byok}, ) # Handle httpx.RemoteProtocolError which can occur during streaming # when the remote server closes the connection unexpectedly # (e.g., "peer closed connection without sending complete message body") if isinstance(e, httpx.RemoteProtocolError): logger.warning(f"[Anthropic] Remote protocol error during streaming: {e}") return LLMConnectionError( message=f"Connection error during Anthropic streaming: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"cause": str(e.__cause__) if e.__cause__ else None, "is_byok": is_byok}, ) # Handle httpx network errors which can occur during streaming # when the connection is unexpectedly closed while reading/writing if isinstance(e, (httpx.ReadError, httpx.WriteError, httpx.ConnectError)): logger.warning(f"[Anthropic] Network error during streaming: {type(e).__name__}: {e}") return LLMConnectionError( message=f"Network error during Anthropic streaming: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"cause": str(e.__cause__) if e.__cause__ else None, "error_type": type(e).__name__, "is_byok": is_byok}, ) if isinstance(e, anthropic.RateLimitError): logger.warning("[Anthropic] Rate limited (429). Consider backoff.") return LLMRateLimitError( message=f"Rate limited by Anthropic: {str(e)}", code=ErrorCode.RATE_LIMIT_EXCEEDED, details={"is_byok": is_byok}, ) if isinstance(e, anthropic.BadRequestError): logger.warning(f"[Anthropic] Bad request: {str(e)}") error_str = str(e).lower() if ( "prompt is too long" in error_str or "exceed context limit" in error_str or "exceeds context" in error_str or "too many total text bytes" in error_str or "total text bytes" in error_str ): # If the context window is too large, we expect to receive either: # 400 - {'type': 'error', 'error': {'type': 'invalid_request_error', 'message': 'prompt is too long: 200758 tokens > 200000 maximum'}} # 400 - {'type': 'error', 'error': {'type': 'invalid_request_error', 'message': 'input length and `max_tokens` exceed context limit: 173298 + 32000 > 200000, decrease input length or `max_tokens` and try again'}} return ContextWindowExceededError( message=f"Bad request to Anthropic (context window exceeded): {str(e)}", details={"is_byok": is_byok}, ) else: return LLMBadRequestError( message=f"Bad request to Anthropic: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"is_byok": is_byok}, ) if isinstance(e, anthropic.AuthenticationError): logger.warning(f"[Anthropic] Authentication error: {str(e)}") return LLMAuthenticationError( message=f"Authentication failed with Anthropic: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"is_byok": is_byok}, ) if isinstance(e, anthropic.PermissionDeniedError): logger.warning(f"[Anthropic] Permission denied: {str(e)}") return LLMPermissionDeniedError( message=f"Permission denied by Anthropic: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"is_byok": is_byok}, ) if isinstance(e, anthropic.NotFoundError): logger.warning(f"[Anthropic] Resource not found: {str(e)}") return LLMNotFoundError( message=f"Resource not found in Anthropic: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"is_byok": is_byok}, ) if isinstance(e, anthropic.UnprocessableEntityError): logger.warning(f"[Anthropic] Unprocessable entity: {str(e)}") return LLMUnprocessableEntityError( message=f"Invalid request content for Anthropic: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"is_byok": is_byok}, ) if isinstance(e, anthropic.InternalServerError): error_str = str(e).lower() if "overflow" in error_str or "upstream connect error" in error_str: logger.warning(f"[Anthropic] Upstream infrastructure error (transient): {str(e)}") return LLMServerError( message=f"Anthropic upstream infrastructure error (transient, may resolve on retry): {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={ "status_code": e.status_code if hasattr(e, "status_code") else None, "transient": True, }, ) if "overloaded" in error_str: return LLMProviderOverloaded( message=f"Anthropic API is overloaded: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, ) logger.warning(f"[Anthropic] Internal server error: {str(e)}") return LLMServerError( message=f"Anthropic internal server error: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={ "status_code": e.status_code if hasattr(e, "status_code") else None, "response": str(e.response) if hasattr(e, "response") else None, }, ) if isinstance(e, anthropic.APIStatusError): logger.warning(f"[Anthropic] API status error: {str(e)}") if hasattr(e, "status_code") and e.status_code == 402 or is_insufficient_credits_message(str(e)): msg = str(e) return LLMInsufficientCreditsError( message=f"Insufficient credits (BYOK): {msg}" if is_byok else f"Insufficient credits: {msg}", code=ErrorCode.PAYMENT_REQUIRED, details={"status_code": getattr(e, "status_code", None), "is_byok": is_byok}, ) if hasattr(e, "status_code") and e.status_code == 413: logger.warning(f"[Anthropic] Request too large (413): {str(e)}") return ContextWindowExceededError( message=f"Request too large for Anthropic (413): {str(e)}", details={"is_byok": is_byok}, ) if "overloaded" in str(e).lower(): return LLMProviderOverloaded( message=f"Anthropic API is overloaded: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={"is_byok": is_byok}, ) return LLMServerError( message=f"Anthropic API error: {str(e)}", code=ErrorCode.INTERNAL_SERVER_ERROR, details={ "status_code": e.status_code if hasattr(e, "status_code") else None, "response": str(e.response) if hasattr(e, "response") else None, "is_byok": is_byok, }, ) return super().handle_llm_error(e, llm_config=llm_config) def extract_usage_statistics(self, response_data: dict | None, llm_config: LLMConfig) -> LettaUsageStatistics: """Extract usage statistics from Anthropic response and return as LettaUsageStatistics.""" if not response_data: return LettaUsageStatistics() response = AnthropicMessage(**response_data) prompt_tokens = response.usage.input_tokens completion_tokens = response.usage.output_tokens # Extract cache data if available (None means not reported, 0 means reported as 0) cache_read_tokens = None cache_creation_tokens = None if hasattr(response.usage, "cache_read_input_tokens"): cache_read_tokens = response.usage.cache_read_input_tokens if hasattr(response.usage, "cache_creation_input_tokens"): cache_creation_tokens = response.usage.cache_creation_input_tokens # Per Anthropic docs: "Total input tokens in a request is the summation of # input_tokens, cache_creation_input_tokens, and cache_read_input_tokens." actual_input_tokens = prompt_tokens + (cache_read_tokens or 0) + (cache_creation_tokens or 0) return LettaUsageStatistics( prompt_tokens=actual_input_tokens, completion_tokens=completion_tokens, total_tokens=actual_input_tokens + completion_tokens, cached_input_tokens=cache_read_tokens, cache_write_tokens=cache_creation_tokens, ) # TODO: Input messages doesn't get used here # TODO: Clean up this interface @trace_method async def convert_response_to_chat_completion( self, response_data: dict, input_messages: List[PydanticMessage], llm_config: LLMConfig, ) -> ChatCompletionResponse: if isinstance(response_data, str): raise LLMServerError( message="Anthropic endpoint returned a raw string instead of a JSON object. This usually indicates the endpoint URL is incorrect or returned an error page.", code=ErrorCode.INTERNAL_SERVER_ERROR, ) """ Example response from Claude 3: response.json = { 'id': 'msg_01W1xg9hdRzbeN2CfZM7zD2w', 'type': 'message', 'role': 'assistant', 'content': [ { 'type': 'text', 'text': "Analyzing user login event. This is Chad's first interaction with me. I will adjust my personality and rapport accordingly." }, { 'type': 'tool_use', 'id': 'toolu_01Ka4AuCmfvxiidnBZuNfP1u', 'name': 'core_memory_append', 'input': { 'name': 'human', 'content': 'Chad is logging in for the first time. I will aim to build a warm and welcoming rapport.', 'request_heartbeat': True } } ], 'model': 'claude-3-haiku-20240307', 'stop_reason': 'tool_use', 'stop_sequence': None, 'usage': { 'input_tokens': 3305, 'output_tokens': 141 } } """ response = AnthropicMessage(**response_data) finish_reason = remap_finish_reason(str(response.stop_reason)) # Extract usage via centralized method from letta.schemas.enums import ProviderType usage_stats = self.extract_usage_statistics(response_data, llm_config).to_usage(ProviderType.anthropic) content = None reasoning_content = None reasoning_content_signature = None redacted_reasoning_content = None tool_calls: list[ToolCall] = [] if len(response.content) > 0: for content_part in response.content: if content_part.type == "text": content = strip_xml_tags(string=content_part.text, tag="thinking") if content_part.type == "tool_use": # hack for incorrect tool format tool_input = json.loads(json.dumps(content_part.input)) # Check if id is a string before calling startswith (sometimes it's an int) if ( "id" in tool_input and isinstance(tool_input["id"], str) and tool_input["id"].startswith("toolu_") and "function" in tool_input ): if isinstance(tool_input["function"], str): tool_input["function"] = json.loads(tool_input["function"]) arguments = json.dumps(tool_input["function"]["arguments"], indent=2) try: args_json = json.loads(arguments) if not isinstance(args_json, dict): raise LLMServerError("Expected parseable json object for arguments") except: arguments = str(tool_input["function"]["arguments"]) else: arguments = json.dumps(tool_input, indent=2) tool_calls.append( ToolCall( id=content_part.id, type="function", function=FunctionCall( name=content_part.name, arguments=arguments, ), ) ) if content_part.type == "thinking": reasoning_content = content_part.thinking reasoning_content_signature = content_part.signature if content_part.type == "redacted_thinking": redacted_reasoning_content = content_part.data else: # Log the full response for debugging logger.error( "[Anthropic] Received response with empty content. Response ID: %s, Model: %s, Stop reason: %s, Full response: %s", response.id, response.model, response.stop_reason, json.dumps(response_data), ) raise LLMServerError( message=f"LLM provider returned empty content in response (ID: {response.id}, model: {response.model}, stop_reason: {response.stop_reason})", code=ErrorCode.INTERNAL_SERVER_ERROR, details={ "response_id": response.id, "model": response.model, "stop_reason": response.stop_reason, }, ) assert response.role == "assistant" choice = Choice( index=0, finish_reason=finish_reason, message=ChoiceMessage( role=response.role, content=content, reasoning_content=reasoning_content, reasoning_content_signature=reasoning_content_signature, redacted_reasoning_content=redacted_reasoning_content, tool_calls=tool_calls or None, ), ) chat_completion_response = ChatCompletionResponse( id=response.id, choices=[choice], created=get_utc_time_int(), model=response.model, usage=usage_stats, ) if llm_config.put_inner_thoughts_in_kwargs: chat_completion_response = unpack_all_inner_thoughts_from_kwargs( response=chat_completion_response, inner_thoughts_key=INNER_THOUGHTS_KWARG ) return chat_completion_response def _add_cache_control_to_system_message(self, system_content): """Add cache control to system message content.""" if isinstance(system_content, str): # For string content, convert to list format with cache control return [{"type": "text", "text": system_content, "cache_control": {"type": "ephemeral"}}] elif isinstance(system_content, list): # For list content, add cache control to the last text block cached_content = system_content.copy() for i in range(len(cached_content) - 1, -1, -1): if cached_content[i].get("type") == "text": cached_content[i]["cache_control"] = {"type": "ephemeral"} break return cached_content return system_content def _add_cache_control_to_messages(self, messages): """ Add cache control to the final content block of the final message. This enables incremental conversation caching per Anthropic docs: "During each turn, we mark the final block of the final message with cache_control so the conversation can be incrementally cached." Args: messages: List of Anthropic-formatted message dicts Returns: Modified messages list with cache_control on final block """ if not messages or len(messages) == 0: return messages # Work backwards to find the last message with content for i in range(len(messages) - 1, -1, -1): message = messages[i] content = message.get("content") if not content: continue # Handle string content if isinstance(content, str): messages[i]["content"] = [{"type": "text", "text": content, "cache_control": {"type": "ephemeral"}}] return messages # Handle list content - add cache_control to the last block if isinstance(content, list) and len(content) > 0: # Add cache_control to the last content block messages[i]["content"][-1]["cache_control"] = {"type": "ephemeral"} return messages return messages def _supports_structured_outputs(model: str) -> bool: """Check if the model supports structured outputs (strict mode). Only these 4 models are supported: - Claude Sonnet 4.5 - Claude Opus 4.1 - Claude Opus 4.5 - Claude Haiku 4.5 """ model_lower = model.lower() if "sonnet-4-5" in model_lower: return True elif "opus-4-1" in model_lower: return True elif "opus-4-5" in model_lower: return True elif "haiku-4-5" in model_lower: return True return False def convert_tools_to_anthropic_format( tools: List[OpenAITool], use_strict: bool = False, add_strict_field: bool = True, ) -> List[dict]: """See: https://docs.anthropic.com/claude/docs/tool-use OpenAI style: "tools": [{ "type": "function", "function": { "name": "find_movies", "description": "find ....", "parameters": { "type": "object", "properties": {...}, "required": List[str], } } }] Anthropic style: "tools": [{ "name": "find_movies", "description": "find ....", "input_schema": { "type": "object", "properties": {...}, "required": List[str], }, }] Args: tools: List of OpenAI-style tools to convert use_strict: If True, add additionalProperties: false to all object schemas add_strict_field: If True (and use_strict=True), add strict: true to allowlisted tools. Set to False for token counting endpoint which doesn't support this field. """ formatted_tools = [] strict_count = 0 for tool in tools: # Get the input schema input_schema = tool.function.parameters or {"type": "object", "properties": {}, "required": []} # Use the older lightweight cleanup: remove defaults and simplify union-with-null. # When using structured outputs (use_strict=True), also add additionalProperties: false to all object types. cleaned_schema = ( _clean_property_schema(input_schema, add_additional_properties_false=use_strict) if isinstance(input_schema, dict) else input_schema ) # Normalize to a safe "object" schema shape to avoid downstream assumptions failing. if isinstance(cleaned_schema, dict): if cleaned_schema.get("type") != "object": cleaned_schema["type"] = "object" if not isinstance(cleaned_schema.get("properties"), dict): cleaned_schema["properties"] = {} # Ensure additionalProperties: false for structured outputs on the top-level schema # Must override any existing additionalProperties: true as well if use_strict: cleaned_schema["additionalProperties"] = False formatted_tool: dict = { "name": tool.function.name, "description": tool.function.description if tool.function.description else "", "input_schema": cleaned_schema, } # Structured outputs "strict" mode: always attach `strict` for allowlisted tools # when we are using structured outputs models. Limit the number of strict tools # to avoid exceeding Anthropic constraints. # NOTE: The token counting endpoint does NOT support `strict` - only the messages endpoint does. if use_strict and add_strict_field and tool.function.name in ANTHROPIC_STRICT_MODE_ALLOWLIST: if strict_count < ANTHROPIC_MAX_STRICT_TOOLS: formatted_tool["strict"] = True strict_count += 1 else: logger.warning( f"Exceeded max strict tools limit ({ANTHROPIC_MAX_STRICT_TOOLS}), tool '{tool.function.name}' will not use strict mode" ) formatted_tools.append(formatted_tool) return formatted_tools def _clean_property_schema(schema: dict, add_additional_properties_false: bool = False) -> dict: """Older schema cleanup used for Anthropic tools. Removes / simplifies fields that commonly cause Anthropic tool schema issues: - Remove `default` values - Simplify nullable unions like {"type": ["null", "string"]} -> {"type": "string"} - Recurse through nested schemas (properties/items/anyOf/oneOf/allOf/etc.) - Optionally add additionalProperties: false to object types (required for structured outputs) """ if not isinstance(schema, dict): return schema cleaned: dict = {} # Simplify union types like ["null", "string"] to "string" if "type" in schema: t = schema.get("type") if isinstance(t, list): non_null = [x for x in t if x != "null"] if len(non_null) == 1: cleaned["type"] = non_null[0] elif len(non_null) > 1: cleaned["type"] = non_null else: cleaned["type"] = "string" else: cleaned["type"] = t for key, value in schema.items(): if key == "type": continue if key == "default": continue if key == "properties" and isinstance(value, dict): cleaned["properties"] = {k: _clean_property_schema(v, add_additional_properties_false) for k, v in value.items()} elif key == "items" and isinstance(value, dict): cleaned["items"] = _clean_property_schema(value, add_additional_properties_false) elif key in ("anyOf", "oneOf", "allOf") and isinstance(value, list): cleaned[key] = [_clean_property_schema(v, add_additional_properties_false) if isinstance(v, dict) else v for v in value] elif key in ("additionalProperties",) and isinstance(value, dict): cleaned[key] = _clean_property_schema(value, add_additional_properties_false) else: cleaned[key] = value # For structured outputs, Anthropic requires additionalProperties: false on all object types # We must override any existing additionalProperties: true as well if add_additional_properties_false and cleaned.get("type") == "object": cleaned["additionalProperties"] = False return cleaned def is_heartbeat(message: dict, is_ping: bool = False) -> bool: """Check if the message is an automated heartbeat ping""" if "role" not in message or message["role"] != "user" or "content" not in message: return False try: message_json = json.loads(message["content"]) except: return False # Check if message_json is a dict (not int, str, list, etc.) if not isinstance(message_json, dict): return False if "reason" not in message_json: return False if message_json.get("type") != "heartbeat": return False if not is_ping: # Just checking if 'type': 'heartbeat' return True else: # Also checking if it's specifically a 'ping' style message # NOTE: this will not catch tool rule heartbeats if REQ_HEARTBEAT_MESSAGE in message_json["reason"] or FUNC_FAILED_HEARTBEAT_MESSAGE in message_json["reason"]: return True else: return False def drop_heartbeats(messages: List[dict]): cleaned_messages = [] # Loop through messages # For messages with role 'user' and len(content) > 1, # Check if content[0].type == 'tool_result' # If so, iterate over content[1:] and while content.type == 'text' and is_heartbeat(content.text), # merge into content[0].content for message in messages: if "role" in message and "content" in message and message["role"] == "user": content_parts = message["content"] if isinstance(content_parts, str): if is_heartbeat({"role": "user", "content": content_parts}): continue elif isinstance(content_parts, list) and len(content_parts) == 1 and "text" in content_parts[0]: if is_heartbeat({"role": "user", "content": content_parts[0]["text"]}): continue # skip else: cleaned_parts = [] # Drop all the parts for content_part in content_parts: if "text" in content_part and is_heartbeat({"role": "user", "content": content_part["text"]}): continue # skip else: cleaned_parts.append(content_part) if len(cleaned_parts) == 0: continue else: message["content"] = cleaned_parts cleaned_messages.append(message) return cleaned_messages def merge_heartbeats_into_tool_responses(messages: List[dict]): """For extended thinking mode, we don't want anything other than tool responses in-between assistant actions Otherwise, the thinking will silently get dropped. NOTE: assumes merge_tool_results_into_user_messages has already been called """ merged_messages = [] # Loop through messages # For messages with role 'user' and len(content) > 1, # Check if content[0].type == 'tool_result' # If so, iterate over content[1:] and while content.type == 'text' and is_heartbeat(content.text), # merge into content[0].content for message in messages: if "role" not in message or "content" not in message: # Skip invalid messages merged_messages.append(message) continue if message["role"] == "user" and len(message["content"]) > 1: content_parts = message["content"] # If the first content part is a tool result, merge the heartbeat content into index 0 of the content # Two end cases: # 1. It was [tool_result, heartbeat], in which case merged result is [tool_result+heartbeat] (len 1) # 2. It was [tool_result, user_text], in which case it should be unchanged (len 2) if "type" in content_parts[0] and "content" in content_parts[0] and content_parts[0]["type"] == "tool_result": new_content_parts = [content_parts[0]] # If the first content part is a tool result, merge the heartbeat content into index 0 of the content for i, content_part in enumerate(content_parts[1:]): # If it's a heartbeat, add it to the merge if ( content_part["type"] == "text" and "text" in content_part and is_heartbeat({"role": "user", "content": content_part["text"]}) ): # NOTE: joining with a ',' new_content_parts[0]["content"] += ", " + content_part["text"] # If it's not, break, and concat to finish else: # Append the rest directly, no merging of content strings new_content_parts.extend(content_parts[i + 1 :]) break # Set the content_parts message["content"] = new_content_parts merged_messages.append(message) else: # Skip invalid messages parts merged_messages.append(message) continue else: merged_messages.append(message) return merged_messages def merge_tool_results_into_user_messages(messages: List[dict]): """Anthropic API doesn't allow role 'tool'->'user' sequences Example HTTP error: messages: roles must alternate between "user" and "assistant", but found multiple "user" roles in a row From: https://docs.anthropic.com/claude/docs/tool-use You may be familiar with other APIs that return tool use as separate from the model's primary output, or which use a special-purpose tool or function message role. In contrast, Anthropic's models and API are built around alternating user and assistant messages, where each message is an array of rich content blocks: text, image, tool_use, and tool_result. """ # TODO walk through the messages list # When a dict (dict_A) with 'role' == 'user' is followed by a dict with 'role' == 'user' (dict B), do the following # dict_A["content"] = dict_A["content"] + dict_B["content"] # The result should be a new merged_messages list that doesn't have any back-to-back dicts with 'role' == 'user' merged_messages = [] if not messages: return merged_messages # Start with the first message in the list current_message = messages[0] for next_message in messages[1:]: if current_message["role"] == "user" and next_message["role"] == "user": # Merge contents of the next user message into current one current_content = ( current_message["content"] if isinstance(current_message["content"], list) else [{"type": "text", "text": current_message["content"]}] ) next_content = ( next_message["content"] if isinstance(next_message["content"], list) else [{"type": "text", "text": next_message["content"]}] ) merged_content: list = current_content + next_content current_message["content"] = merged_content else: # Append the current message to result as it's complete merged_messages.append(current_message) # Move on to the next message current_message = next_message # Append the last processed message to the result merged_messages.append(current_message) return merged_messages def dedupe_tool_results_in_user_messages(messages: List[dict]) -> List[dict]: """Ensure each tool_use has a single tool_result within a user message. If multiple tool_result blocks with the same tool_use_id appear in the same user message (e.g., after merging consecutive user messages), merge their content and keep only one block. """ any_deduped = False dedup_counts: dict[str, int] = {} for msg in messages: if not isinstance(msg, dict): continue if msg.get("role") != "user": continue content = msg.get("content") if not isinstance(content, list) or len(content) == 0: continue seen: dict[str, dict] = {} new_content: list = [] for block in content: if isinstance(block, dict) and block.get("type") == "tool_result" and "tool_use_id" in block: tid = block.get("tool_use_id") if tid in seen: # Merge duplicate tool_result into the first occurrence first = seen[tid] extra = block.get("content") if extra: if isinstance(first.get("content"), str) and isinstance(extra, str): sep = "\n" if first["content"] and extra else "" first["content"] = f"{first['content']}{sep}{extra}" else: sep = "\n" if first.get("content") else "" # Fallback: coerce to strings and concat first["content"] = f"{first.get('content')}{sep}{extra}" any_deduped = True dedup_counts[tid] = dedup_counts.get(tid, 0) + 1 # Skip appending duplicate continue else: new_content.append(block) seen[tid] = block else: new_content.append(block) # Replace content if we pruned/merged duplicates if len(new_content) != len(content): msg["content"] = new_content if any_deduped: logger.error("[Anthropic] Deduped tool_result blocks in user messages: %s", dedup_counts) return messages def remap_finish_reason(stop_reason: str) -> str: """Remap Anthropic's 'stop_reason' to OpenAI 'finish_reason' OpenAI: 'stop', 'length', 'function_call', 'content_filter', null see: https://platform.openai.com/docs/guides/text-generation/chat-completions-api From: https://docs.anthropic.com/claude/reference/migrating-from-text-completions-to-messages#stop-reason Messages have a stop_reason of one of the following values: "end_turn": The conversational turn ended naturally. "stop_sequence": One of your specified custom stop sequences was generated. "max_tokens": (unchanged) """ if stop_reason == "end_turn": return "stop" elif stop_reason == "stop_sequence": return "stop" elif stop_reason == "max_tokens": return "length" elif stop_reason == "tool_use": return "function_call" else: raise LLMServerError(f"Unexpected stop_reason: {stop_reason}") def strip_xml_tags(string: str, tag: Optional[str]) -> str: if tag is None: return string # Construct the regular expression pattern to find the start and end tags tag_pattern = f"<{tag}.*?>|" # Use the regular expression to replace the tags with an empty string return re.sub(tag_pattern, "", string) def strip_xml_tags_streaming(string: str, tag: Optional[str]) -> str: if tag is None: return string # Handle common partial tag cases parts_to_remove = [ "<", # Leftover start bracket f"<{tag}", # Opening tag start f"", # Closing tag end f"{tag}>", # Opening tag end f"/{tag}", # Partial closing tag without > ">", # Leftover end bracket ] result = string for part in parts_to_remove: result = result.replace(part, "") return result