feat: anthropic class improvements (#1425)

This commit is contained in:
cthomas
2025-03-27 08:47:54 -07:00
committed by GitHub
parent c1001e482d
commit c2f79ac61f
6 changed files with 215 additions and 196 deletions

View File

@@ -302,10 +302,8 @@ class Agent(BaseAgent):
log_telemetry(self.logger, "_get_ai_reply create start")
# New LLM client flow
llm_client = LLMClient.create(
agent_id=self.agent_state.id,
llm_config=self.agent_state.llm_config,
put_inner_thoughts_first=put_inner_thoughts_first,
actor_id=self.agent_state.created_by_id,
)
if llm_client and not stream:

View File

@@ -62,6 +62,26 @@ class LLMError(LettaError):
pass
class LLMConnectionError(LLMError):
"""Error when unable to connect to LLM service"""
class LLMRateLimitError(LLMError):
"""Error when rate limited by LLM service"""
class LLMPermissionDeniedError(LLMError):
"""Error when permission is denied by LLM service"""
class LLMNotFoundError(LLMError):
"""Error when requested resource is not found"""
class LLMUnprocessableEntityError(LLMError):
"""Error when request is well-formed but semantically invalid"""
class BedrockPermissionError(LettaError):
"""Exception raised for errors in the Bedrock permission process."""

View File

@@ -7,12 +7,11 @@ from anthropic.types import Message as AnthropicMessage
from letta.helpers.datetime_helpers import get_utc_time
from letta.llm_api.helpers import add_inner_thoughts_to_functions, unpack_all_inner_thoughts_from_kwargs
from letta.llm_api.llm_api_tools import cast_message_to_subtype
from letta.llm_api.llm_client_base import LLMClientBase
from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION
from letta.log import get_logger
from letta.schemas.message import Message as PydanticMessage
from letta.schemas.openai.chat_completion_request import ChatCompletionRequest, Tool
from letta.schemas.openai.chat_completion_request import Tool
from letta.schemas.openai.chat_completion_response import ChatCompletionResponse, Choice, FunctionCall
from letta.schemas.openai.chat_completion_response import Message as ChoiceMessage
from letta.schemas.openai.chat_completion_response import ToolCall, UsageStatistics
@@ -26,20 +25,14 @@ logger = get_logger(__name__)
class AnthropicClient(LLMClientBase):
def request(self, request_data: dict) -> dict:
try:
client = self._get_anthropic_client(async_client=False)
response = client.beta.messages.create(**request_data, betas=["tools-2024-04-04"])
return response.model_dump()
except Exception as e:
self._handle_anthropic_error(e)
client = self._get_anthropic_client(async_client=False)
response = client.beta.messages.create(**request_data, betas=["tools-2024-04-04"])
return response.model_dump()
async def request_async(self, request_data: dict) -> dict:
try:
client = self._get_anthropic_client(async_client=True)
response = await client.beta.messages.create(**request_data, betas=["tools-2024-04-04"])
return response.model_dump()
except Exception as e:
self._handle_anthropic_error(e)
client = self._get_anthropic_client(async_client=True)
response = await client.beta.messages.create(**request_data, betas=["tools-2024-04-04"])
return response.model_dump()
def _get_anthropic_client(self, async_client: bool = False) -> Union[anthropic.AsyncAnthropic, anthropic.Anthropic]:
override_key = ProviderManager().get_anthropic_override_key()
@@ -47,15 +40,6 @@ class AnthropicClient(LLMClientBase):
return anthropic.AsyncAnthropic(api_key=override_key) if override_key else anthropic.AsyncAnthropic()
return anthropic.Anthropic(api_key=override_key) if override_key else anthropic.Anthropic()
def _handle_anthropic_error(self, e: Exception):
if isinstance(e, anthropic.APIConnectionError):
logger.warning(f"[Anthropic] API connection error: {e.__cause__}")
elif isinstance(e, anthropic.RateLimitError):
logger.warning("[Anthropic] Rate limited (429). Consider backoff.")
elif isinstance(e, anthropic.APIStatusError):
logger.warning(f"[Anthropic] API status error: {e.status_code}, {e.response}")
raise e
def build_request_data(
self,
messages: List[PydanticMessage],
@@ -63,43 +47,155 @@ class AnthropicClient(LLMClientBase):
tool_call: Optional[str],
force_tool_call: Optional[str] = None,
) -> dict:
prefix_fill = True
if not self.use_tool_naming:
raise NotImplementedError("Only tool calling supported on Anthropic API requests")
if tools is None:
# Special case for summarization path
available_tools = None
tool_choice = None
elif force_tool_call is not None:
assert tools is not None
tool_choice = {"type": "tool", "name": force_tool_call}
available_tools = [{"type": "function", "function": f} for f in tools if f["name"] == force_tool_call]
if not self.llm_config.max_tokens:
raise ValueError("Max tokens must be set for anthropic")
# need to have this setting to be able to put inner thoughts in kwargs
self.llm_config.put_inner_thoughts_in_kwargs = True
else:
if self.llm_config.put_inner_thoughts_in_kwargs:
# tool_choice_type other than "auto" only plays nice if thinking goes inside the tool calls
tool_choice = {"type": "any", "disable_parallel_tool_use": True}
else:
tool_choice = {"type": "auto", "disable_parallel_tool_use": True}
available_tools = [{"type": "function", "function": f} for f in tools]
data = {
"model": self.llm_config.model,
"max_tokens": self.llm_config.max_tokens,
"temperature": self.llm_config.temperature,
}
chat_completion_request = ChatCompletionRequest(
model=self.llm_config.model,
messages=[cast_message_to_subtype(m.to_openai_dict()) for m in messages],
tools=available_tools,
tool_choice=tool_choice,
max_tokens=self.llm_config.max_tokens, # Note: max_tokens is required for Anthropic API
temperature=self.llm_config.temperature,
# Extended Thinking
if self.llm_config.enable_reasoner:
assert (
self.llm_config.max_reasoning_tokens is not None and self.llm_config.max_reasoning_tokens < self.llm_config.max_tokens
), "max tokens must be greater than thinking budget"
assert not self.llm_config.put_inner_thoughts_in_kwargs, "extended thinking not compatible with put_inner_thoughts_in_kwargs"
data["thinking"] = {
"type": "enabled",
"budget_tokens": self.llm_config.max_reasoning_tokens,
}
# `temperature` may only be set to 1 when thinking is enabled. Please consult our documentation at https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking#important-considerations-when-using-extended-thinking'
data["temperature"] = 1.0
# Silently disable prefix_fill for now
prefix_fill = False
# Tools
tools_for_request = (
[Tool(function=f) for f in tools if f["name"] == force_tool_call]
if force_tool_call is not None
else [Tool(function=f) for f in tools]
)
if force_tool_call is not None:
self.llm_config.put_inner_thoughts_in_kwargs = True # why do we do this ?
return _prepare_anthropic_request(
data=chat_completion_request,
put_inner_thoughts_in_kwargs=self.llm_config.put_inner_thoughts_in_kwargs,
extended_thinking=self.llm_config.enable_reasoner,
max_reasoning_tokens=self.llm_config.max_reasoning_tokens,
)
# Add inner thoughts kwarg
if len(tools_for_request) > 0 and self.llm_config.put_inner_thoughts_in_kwargs:
tools_with_inner_thoughts = add_inner_thoughts_to_functions(
functions=[t.function for t in tools_for_request],
inner_thoughts_key=INNER_THOUGHTS_KWARG,
inner_thoughts_description=INNER_THOUGHTS_KWARG_DESCRIPTION,
)
tools_for_request = [Tool(function=f) for f in tools_with_inner_thoughts]
if len(tools_for_request) > 0:
# TODO eventually enable parallel tool use
data["tools"] = convert_tools_to_anthropic_format(tools_for_request)
# Messages
inner_thoughts_xml_tag = "thinking"
data["messages"] = [
m.to_anthropic_dict(
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
put_inner_thoughts_in_kwargs=self.llm_config.put_inner_thoughts_in_kwargs,
)
for m in messages
]
# Move 'system' to the top level
# assert data["messages"][0]["role"] == "system", f"Expected 'system' role in messages[0]:\n{data['messages'][0]}"
data["system"] = data["messages"][0]["content"]
data["messages"] = data["messages"][1:]
# Ensure first message is user
if data["messages"][0]["role"] != "user":
data["messages"] = [{"role": "user", "content": DUMMY_FIRST_USER_MESSAGE}] + data["messages"]
# Handle alternating messages
data["messages"] = merge_tool_results_into_user_messages(data["messages"])
# Prefix fill
# https://docs.anthropic.com/en/api/messages#body-messages
# NOTE: cannot prefill with tools for opus:
# Your API request included an `assistant` message in the final position, which would pre-fill the `assistant` response. When using tools with "claude-3-opus-20240229"
if prefix_fill and not self.llm_config.put_inner_thoughts_in_kwargs and "opus" not in data["model"]:
data["messages"].append(
# Start the thinking process for the assistant
{"role": "assistant", "content": f"<{inner_thoughts_xml_tag}>"},
)
return data
def handle_llm_error(self, e: Exception) -> Exception:
if isinstance(e, anthropic.APIConnectionError):
logger.warning(f"[Anthropic] API connection error: {e.__cause__}")
return LLMConnectionError(
message=f"Failed to connect to Anthropic: {str(e)}",
code=ErrorCode.INTERNAL_SERVER_ERROR,
details={"cause": str(e.__cause__) if e.__cause__ else None},
)
if isinstance(e, anthropic.RateLimitError):
logger.warning("[Anthropic] Rate limited (429). Consider backoff.")
return LLMRateLimitError(
message=f"Rate limited by Anthropic: {str(e)}",
code=ErrorCode.RATE_LIMIT_EXCEEDED,
)
if isinstance(e, anthropic.BadRequestError):
logger.warning(f"[Anthropic] Bad request: {str(e)}")
return LLMBadRequestError(
message=f"Bad request to Anthropic: {str(e)}",
code=ErrorCode.INTERNAL_SERVER_ERROR,
)
if isinstance(e, anthropic.AuthenticationError):
logger.warning(f"[Anthropic] Authentication error: {str(e)}")
return LLMAuthenticationError(
message=f"Authentication failed with Anthropic: {str(e)}",
code=ErrorCode.INTERNAL_SERVER_ERROR,
)
if isinstance(e, anthropic.PermissionDeniedError):
logger.warning(f"[Anthropic] Permission denied: {str(e)}")
return LLMPermissionDeniedError(
message=f"Permission denied by Anthropic: {str(e)}",
code=ErrorCode.INTERNAL_SERVER_ERROR,
)
if isinstance(e, anthropic.NotFoundError):
logger.warning(f"[Anthropic] Resource not found: {str(e)}")
return LLMNotFoundError(
message=f"Resource not found in Anthropic: {str(e)}",
code=ErrorCode.INTERNAL_SERVER_ERROR,
)
if isinstance(e, anthropic.UnprocessableEntityError):
logger.warning(f"[Anthropic] Unprocessable entity: {str(e)}")
return LLMUnprocessableEntityError(
message=f"Invalid request content for Anthropic: {str(e)}",
code=ErrorCode.INTERNAL_SERVER_ERROR,
)
if isinstance(e, anthropic.APIStatusError):
logger.warning(f"[Anthropic] API status error: {str(e)}")
return LLMServerError(
message=f"Anthropic API error: {str(e)}",
code=ErrorCode.INTERNAL_SERVER_ERROR,
details={
"status_code": e.status_code if hasattr(e, "status_code") else None,
"response": str(e.response) if hasattr(e, "response") else None,
},
)
return super().handle_llm_error(e)
def convert_response_to_chat_completion(
self,
@@ -208,118 +304,6 @@ class AnthropicClient(LLMClientBase):
return chat_completion_response
def _prepare_anthropic_request(
data: ChatCompletionRequest,
inner_thoughts_xml_tag: Optional[str] = "thinking",
# if true, prefix fill the generation with the thinking tag
prefix_fill: bool = True,
# if true, put COT inside the tool calls instead of inside the content
put_inner_thoughts_in_kwargs: bool = False,
bedrock: bool = False,
# extended thinking related fields
# https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking
extended_thinking: bool = False,
max_reasoning_tokens: Optional[int] = None,
) -> dict:
"""Prepare the request data for Anthropic API format."""
if extended_thinking:
assert (
max_reasoning_tokens is not None and max_reasoning_tokens < data.max_tokens
), "max tokens must be greater than thinking budget"
assert not put_inner_thoughts_in_kwargs, "extended thinking not compatible with put_inner_thoughts_in_kwargs"
# assert not prefix_fill, "extended thinking not compatible with prefix_fill"
# Silently disable prefix_fill for now
prefix_fill = False
# if needed, put inner thoughts as a kwarg for all tools
if data.tools and put_inner_thoughts_in_kwargs:
functions = add_inner_thoughts_to_functions(
functions=[t.function.model_dump() for t in data.tools],
inner_thoughts_key=INNER_THOUGHTS_KWARG,
inner_thoughts_description=INNER_THOUGHTS_KWARG_DESCRIPTION,
)
data.tools = [Tool(function=f) for f in functions]
# convert the tools to Anthropic's payload format
anthropic_tools = None if data.tools is None else convert_tools_to_anthropic_format(data.tools)
# pydantic -> dict
data = data.model_dump(exclude_none=True)
if extended_thinking:
data["thinking"] = {
"type": "enabled",
"budget_tokens": max_reasoning_tokens,
}
# `temperature` may only be set to 1 when thinking is enabled. Please consult our documentation at https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking#important-considerations-when-using-extended-thinking'
data["temperature"] = 1.0
if "functions" in data:
raise ValueError(f"'functions' unexpected in Anthropic API payload")
# Handle tools
if "tools" in data and data["tools"] is None:
data.pop("tools")
data.pop("tool_choice", None)
elif anthropic_tools is not None:
# TODO eventually enable parallel tool use
data["tools"] = anthropic_tools
# Move 'system' to the top level
assert data["messages"][0]["role"] == "system", f"Expected 'system' role in messages[0]:\n{data['messages'][0]}"
data["system"] = data["messages"][0]["content"]
data["messages"] = data["messages"][1:]
# Process messages
for message in data["messages"]:
if "content" not in message:
message["content"] = None
# Convert to Anthropic format
msg_objs = [
PydanticMessage.dict_to_message(
user_id=None,
agent_id=None,
openai_message_dict=m,
)
for m in data["messages"]
]
data["messages"] = [
m.to_anthropic_dict(
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
)
for m in msg_objs
]
# Ensure first message is user
if data["messages"][0]["role"] != "user":
data["messages"] = [{"role": "user", "content": DUMMY_FIRST_USER_MESSAGE}] + data["messages"]
# Handle alternating messages
data["messages"] = merge_tool_results_into_user_messages(data["messages"])
# Handle prefix fill (not compatible with inner-thouguhts-in-kwargs)
# https://docs.anthropic.com/en/api/messages#body-messages
# NOTE: cannot prefill with tools for opus:
# Your API request included an `assistant` message in the final position, which would pre-fill the `assistant` response. When using tools with "claude-3-opus-20240229"
if prefix_fill and not put_inner_thoughts_in_kwargs and "opus" not in data["model"]:
if not bedrock: # not support for bedrock
data["messages"].append(
# Start the thinking process for the assistant
{"role": "assistant", "content": f"<{inner_thoughts_xml_tag}>"},
)
# Validate max_tokens
assert "max_tokens" in data, data
# Remove OpenAI-specific fields
for field in ["frequency_penalty", "logprobs", "n", "top_p", "presence_penalty", "user", "stream"]:
data.pop(field, None)
return data
def convert_tools_to_anthropic_format(tools: List[Tool]) -> List[dict]:
"""See: https://docs.anthropic.com/claude/docs/tool-use

View File

@@ -9,21 +9,17 @@ class LLMClient:
@staticmethod
def create(
agent_id: str,
llm_config: LLMConfig,
put_inner_thoughts_first: bool = True,
actor_id: Optional[str] = None,
) -> Optional[LLMClientBase]:
"""
Create an LLM client based on the model endpoint type.
Args:
agent_id: Unique identifier for the agent
llm_config: Configuration for the LLM model
put_inner_thoughts_first: Whether to put inner thoughts first in the response
use_structured_output: Whether to use structured output
use_tool_naming: Whether to use tool naming
actor_id: Optional actor identifier
Returns:
An instance of LLMClientBase subclass
@@ -36,19 +32,22 @@ class LLMClient:
from letta.llm_api.google_ai_client import GoogleAIClient
return GoogleAIClient(
agent_id=agent_id, llm_config=llm_config, put_inner_thoughts_first=put_inner_thoughts_first, actor_id=actor_id
llm_config=llm_config,
put_inner_thoughts_first=put_inner_thoughts_first,
)
case "google_vertex":
from letta.llm_api.google_vertex_client import GoogleVertexClient
return GoogleVertexClient(
agent_id=agent_id, llm_config=llm_config, put_inner_thoughts_first=put_inner_thoughts_first, actor_id=actor_id
llm_config=llm_config,
put_inner_thoughts_first=put_inner_thoughts_first,
)
case "anthropic":
from letta.llm_api.anthropic_client import AnthropicClient
return AnthropicClient(
agent_id=agent_id, llm_config=llm_config, put_inner_thoughts_first=put_inner_thoughts_first, actor_id=actor_id
llm_config=llm_config,
put_inner_thoughts_first=put_inner_thoughts_first,
)
case _:
return None

View File

@@ -18,17 +18,13 @@ class LLMClientBase:
def __init__(
self,
agent_id: str,
llm_config: LLMConfig,
put_inner_thoughts_first: Optional[bool] = True,
use_structured_output: Optional[bool] = True,
use_tool_naming: bool = True,
actor_id: Optional[str] = None,
):
self.agent_id = agent_id
self.llm_config = llm_config
self.put_inner_thoughts_first = put_inner_thoughts_first
self.actor_id = actor_id
self.use_tool_naming = use_tool_naming
def send_llm_request(
@@ -46,13 +42,19 @@ class LLMClientBase:
Otherwise returns a ChatCompletionResponse.
"""
request_data = self.build_request_data(messages, tools, tool_call)
log_event(name="llm_request_sent", attributes=request_data)
if stream:
return self.stream(request_data)
else:
response_data = self.request(request_data)
response_data = {}
try:
log_event(name="llm_request_sent", attributes=request_data)
if stream:
return self.stream(request_data)
else:
response_data = self.request(request_data)
log_event(name="llm_response_received", attributes=response_data)
return self.convert_response_to_chat_completion(response_data, messages)
except Exception as e:
raise self.handle_llm_error(e)
return self.convert_response_to_chat_completion(response_data, messages)
async def send_llm_request_async(
self,
@@ -68,14 +70,20 @@ class LLMClientBase:
If stream=True, returns an AsyncStream[ChatCompletionChunk] that can be async iterated over.
Otherwise returns a ChatCompletionResponse.
"""
request_data = self.build_request_data(messages, tools, tool_call)
log_event(name="llm_request_sent", attributes=request_data)
if stream:
return await self.stream_async(request_data)
else:
response_data = await self.request_async(request_data)
request_data = self.build_request_data(messages, tools, tool_call, force_tool_call)
response_data = {}
try:
log_event(name="llm_request_sent", attributes=request_data)
if stream:
return await self.stream_async(request_data)
else:
response_data = await self.request_async(request_data)
log_event(name="llm_response_received", attributes=response_data)
return self.convert_response_to_chat_completion(response_data, messages)
except Exception as e:
raise self.handle_llm_error(e)
return self.convert_response_to_chat_completion(response_data, messages)
@abstractmethod
def build_request_data(
@@ -129,3 +137,17 @@ class LLMClientBase:
Performs underlying streaming request to llm and returns raw response.
"""
raise NotImplementedError(f"Streaming is not supported for {self.llm_config.model_endpoint_type}")
@abstractmethod
def handle_llm_error(self, e: Exception) -> Exception:
"""
Maps provider-specific errors to common LLMError types.
Each LLM provider should implement this to translate their specific errors.
Args:
e: The original provider-specific exception
Returns:
An LLMError subclass that represents the error in a provider-agnostic way
"""
return LLMError(f"Unhandled LLM error: {str(e)}")

View File

@@ -104,11 +104,7 @@ def check_first_response_is_valid_for_llm_endpoint(filename: str, validate_inner
messages = client.server.agent_manager.get_in_context_messages(agent_id=full_agent_state.id, actor=client.user)
agent = Agent(agent_state=full_agent_state, interface=None, user=client.user)
llm_client = LLMClient.create(
agent_id=agent_state.id,
llm_config=agent_state.llm_config,
actor_id=str(uuid.UUID(int=1)),
)
llm_client = LLMClient.create(llm_config=agent_state.llm_config)
if llm_client:
response = llm_client.send_llm_request(
messages=messages,