From edf39c565f5ce16c1fe774b0f4c514b562d86f4b Mon Sep 17 00:00:00 2001 From: cthomas Date: Wed, 13 Aug 2025 15:09:13 -0700 Subject: [PATCH] chore: clean up legacy anthropic path (#3905) --- letta/llm_api/anthropic.py | 327 +-------------------------------- letta/llm_api/llm_api_tools.py | 89 +-------- 2 files changed, 2 insertions(+), 414 deletions(-) diff --git a/letta/llm_api/anthropic.py b/letta/llm_api/anthropic.py index 9e03aa3c..4c0999c0 100644 --- a/letta/llm_api/anthropic.py +++ b/letta/llm_api/anthropic.py @@ -20,16 +20,14 @@ from anthropic.types.beta import ( ) from letta.errors import BedrockError, BedrockPermissionError, ErrorCode, LLMAuthenticationError, LLMError -from letta.helpers.datetime_helpers import get_utc_time_int, timestamp_to_datetime +from letta.helpers.datetime_helpers import get_utc_time_int from letta.llm_api.aws_bedrock import get_bedrock_client from letta.llm_api.helpers import add_inner_thoughts_to_functions from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION -from letta.local_llm.utils import num_tokens_from_functions, num_tokens_from_messages from letta.log import get_logger from letta.otel.tracing import log_event from letta.schemas.enums import ProviderCategory from letta.schemas.message import Message as _Message -from letta.schemas.message import MessageRole as _MessageRole from letta.schemas.openai.chat_completion_request import ChatCompletionRequest, Tool from letta.schemas.openai.chat_completion_response import ( ChatCompletionChunkResponse, @@ -39,13 +37,11 @@ from letta.schemas.openai.chat_completion_response import ( FunctionCall, FunctionCallDelta, ) -from letta.schemas.openai.chat_completion_response import Message from letta.schemas.openai.chat_completion_response import Message as ChoiceMessage from letta.schemas.openai.chat_completion_response import MessageDelta, ToolCall, ToolCallDelta, UsageStatistics from letta.services.provider_manager import ProviderManager from letta.services.user_manager import UserManager from letta.settings import model_settings -from letta.streaming_interface import AgentChunkStreamingInterface, AgentRefreshStreamingInterface logger = get_logger(__name__) @@ -786,43 +782,6 @@ def _prepare_anthropic_request( return data -def anthropic_chat_completions_request( - data: ChatCompletionRequest, - inner_thoughts_xml_tag: Optional[str] = "thinking", - put_inner_thoughts_in_kwargs: bool = False, - extended_thinking: bool = False, - max_reasoning_tokens: Optional[int] = None, - provider_name: Optional[str] = None, - provider_category: Optional[ProviderCategory] = None, - betas: List[str] = ["tools-2024-04-04"], - user_id: Optional[str] = None, -) -> ChatCompletionResponse: - """https://docs.anthropic.com/claude/docs/tool-use""" - anthropic_client = None - if provider_category == ProviderCategory.byok: - actor = UserManager().get_user_or_default(user_id=user_id) - api_key = ProviderManager().get_override_key(provider_name, actor=actor) - anthropic_client = anthropic.Anthropic(api_key=api_key) - elif model_settings.anthropic_api_key: - anthropic_client = anthropic.Anthropic() - else: - raise ValueError("No available Anthropic API key") - data = _prepare_anthropic_request( - data=data, - inner_thoughts_xml_tag=inner_thoughts_xml_tag, - put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs, - extended_thinking=extended_thinking, - max_reasoning_tokens=max_reasoning_tokens, - ) - log_event(name="llm_request_sent", attributes=data) - response = anthropic_client.beta.messages.create( - **data, - betas=betas, - ) - log_event(name="llm_response_received", attributes={"response": response.json()}) - return convert_anthropic_response_to_chatcompletion(response=response, inner_thoughts_xml_tag=inner_thoughts_xml_tag) - - def anthropic_bedrock_chat_completions_request( data: ChatCompletionRequest, inner_thoughts_xml_tag: Optional[str] = "thinking", @@ -920,287 +879,3 @@ def anthropic_chat_completions_request_stream( message_id = chunk.message.id model = chunk.message.model yield convert_anthropic_stream_event_to_chatcompletion(chunk, message_id, model, inner_thoughts_xml_tag) - - -def anthropic_chat_completions_process_stream( - chat_completion_request: ChatCompletionRequest, - stream_interface: Optional[Union[AgentChunkStreamingInterface, AgentRefreshStreamingInterface]] = None, - inner_thoughts_xml_tag: Optional[str] = "thinking", - put_inner_thoughts_in_kwargs: bool = False, - extended_thinking: bool = False, - max_reasoning_tokens: Optional[int] = None, - provider_name: Optional[str] = None, - provider_category: Optional[ProviderCategory] = None, - create_message_id: bool = True, - create_message_datetime: bool = True, - betas: List[str] = ["tools-2024-04-04"], - name: Optional[str] = None, - user_id: Optional[str] = None, -) -> ChatCompletionResponse: - """Process a streaming completion response from Anthropic, similar to OpenAI's streaming. - - Args: - api_key: The Anthropic API key - chat_completion_request: The chat completion request - stream_interface: Interface for handling streaming chunks - inner_thoughts_xml_tag: Tag for inner thoughts in the response - create_message_id: Whether to create a message ID - create_message_datetime: Whether to create message datetime - betas: Beta features to enable - - Returns: - The final ChatCompletionResponse - """ - assert chat_completion_request.stream == True - assert stream_interface is not None, "Required" - - # Count prompt tokens - we'll get completion tokens from the final response - chat_history = [m.model_dump(exclude_none=True) for m in chat_completion_request.messages] - prompt_tokens = num_tokens_from_messages( - messages=chat_history, - model=chat_completion_request.model, - ) - - # Add tokens for tools if present - if chat_completion_request.tools is not None: - assert chat_completion_request.functions is None - prompt_tokens += num_tokens_from_functions( - functions=[t.function.model_dump() for t in chat_completion_request.tools], - model=chat_completion_request.model, - ) - elif chat_completion_request.functions is not None: - assert chat_completion_request.tools is None - prompt_tokens += num_tokens_from_functions( - functions=[f.model_dump() for f in chat_completion_request.functions], - model=chat_completion_request.model, - ) - - # Create a dummy message for ID/datetime if needed - dummy_message = _Message( - role=_MessageRole.assistant, - content=[], - agent_id="", - model="", - name=None, - tool_calls=None, - tool_call_id=None, - ) - - TEMP_STREAM_RESPONSE_ID = "temp_id" - TEMP_STREAM_FINISH_REASON = "temp_null" - TEMP_STREAM_TOOL_CALL_ID = "temp_id" - chat_completion_response = ChatCompletionResponse( - id=dummy_message.id if create_message_id else TEMP_STREAM_RESPONSE_ID, - choices=[], - created=int(dummy_message.created_at.timestamp()), - model=chat_completion_request.model, - usage=UsageStatistics( - prompt_tokens=prompt_tokens, - total_tokens=prompt_tokens, - ), - ) - - log_event(name="llm_request_sent", attributes=chat_completion_request.model_dump()) - - if stream_interface: - stream_interface.stream_start() - - completion_tokens = 0 - prev_message_type = None - message_idx = 0 - try: - for chunk_idx, chat_completion_chunk in enumerate( - anthropic_chat_completions_request_stream( - data=chat_completion_request, - inner_thoughts_xml_tag=inner_thoughts_xml_tag, - put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs, - extended_thinking=extended_thinking, - max_reasoning_tokens=max_reasoning_tokens, - provider_name=provider_name, - provider_category=provider_category, - betas=betas, - user_id=user_id, - ) - ): - assert isinstance(chat_completion_chunk, ChatCompletionChunkResponse), type(chat_completion_chunk) - - if stream_interface: - if isinstance(stream_interface, AgentChunkStreamingInterface): - message_type = stream_interface.process_chunk( - chat_completion_chunk, - message_id=chat_completion_response.id if create_message_id else chat_completion_chunk.id, - message_date=( - timestamp_to_datetime(chat_completion_response.created) - if create_message_datetime - else timestamp_to_datetime(chat_completion_chunk.created) - ), - # if extended_thinking is on, then reasoning_content will be flowing as chunks - # TODO handle emitting redacted reasoning content (e.g. as concat?) - expect_reasoning_content=extended_thinking, - name=name, - message_index=message_idx, - prev_message_type=prev_message_type, - ) - if message_type != prev_message_type and message_type is not None and prev_message_type is not None: - message_idx += 1 - if message_type is not None: - prev_message_type = message_type - elif isinstance(stream_interface, AgentRefreshStreamingInterface): - stream_interface.process_refresh(chat_completion_response) - else: - raise TypeError(stream_interface) - - if chunk_idx == 0: - # initialize the choice objects which we will increment with the deltas - num_choices = len(chat_completion_chunk.choices) - assert num_choices > 0 - chat_completion_response.choices = [ - Choice( - finish_reason=TEMP_STREAM_FINISH_REASON, # NOTE: needs to be ovrerwritten - index=i, - message=Message( - role="assistant", - ), - ) - for i in range(len(chat_completion_chunk.choices)) - ] - - # add the choice delta - assert len(chat_completion_chunk.choices) == len(chat_completion_response.choices), chat_completion_chunk - for chunk_choice in chat_completion_chunk.choices: - if chunk_choice.finish_reason is not None: - chat_completion_response.choices[chunk_choice.index].finish_reason = chunk_choice.finish_reason - - if chunk_choice.logprobs is not None: - chat_completion_response.choices[chunk_choice.index].logprobs = chunk_choice.logprobs - - accum_message = chat_completion_response.choices[chunk_choice.index].message - message_delta = chunk_choice.delta - - if message_delta.content is not None: - content_delta = message_delta.content - if accum_message.content is None: - accum_message.content = content_delta - else: - accum_message.content += content_delta - - # NOTE: for extended_thinking mode - if extended_thinking and message_delta.reasoning_content is not None: - reasoning_content_delta = message_delta.reasoning_content - if accum_message.reasoning_content is None: - accum_message.reasoning_content = reasoning_content_delta - else: - accum_message.reasoning_content += reasoning_content_delta - - # NOTE: extended_thinking sends a signature - if extended_thinking and message_delta.reasoning_content_signature is not None: - reasoning_content_signature_delta = message_delta.reasoning_content_signature - if accum_message.reasoning_content_signature is None: - accum_message.reasoning_content_signature = reasoning_content_signature_delta - else: - accum_message.reasoning_content_signature += reasoning_content_signature_delta - - # NOTE: extended_thinking also has the potential for redacted_reasoning_content - if extended_thinking and message_delta.redacted_reasoning_content is not None: - redacted_reasoning_content_delta = message_delta.redacted_reasoning_content - if accum_message.redacted_reasoning_content is None: - accum_message.redacted_reasoning_content = redacted_reasoning_content_delta - else: - accum_message.redacted_reasoning_content += redacted_reasoning_content_delta - - # TODO(charles) make sure this works for parallel tool calling? - if message_delta.tool_calls is not None: - tool_calls_delta = message_delta.tool_calls - - # If this is the first tool call showing up in a chunk, initialize the list with it - if accum_message.tool_calls is None: - accum_message.tool_calls = [ - ToolCall(id=TEMP_STREAM_TOOL_CALL_ID, function=FunctionCall(name="", arguments="")) - for _ in range(len(tool_calls_delta)) - ] - - # There may be many tool calls in a tool calls delta (e.g. parallel tool calls) - for tool_call_delta in tool_calls_delta: - if tool_call_delta.id is not None: - # TODO assert that we're not overwriting? - # TODO += instead of =? - if tool_call_delta.index not in range(len(accum_message.tool_calls)): - warnings.warn( - f"Tool call index out of range ({tool_call_delta.index})\ncurrent tool calls: {accum_message.tool_calls}\ncurrent delta: {tool_call_delta}" - ) - # force index 0 - # accum_message.tool_calls[0].id = tool_call_delta.id - else: - accum_message.tool_calls[tool_call_delta.index].id = tool_call_delta.id - if tool_call_delta.function is not None: - if tool_call_delta.function.name is not None: - # TODO assert that we're not overwriting? - # TODO += instead of =? - if tool_call_delta.index not in range(len(accum_message.tool_calls)): - warnings.warn( - f"Tool call index out of range ({tool_call_delta.index})\ncurrent tool calls: {accum_message.tool_calls}\ncurrent delta: {tool_call_delta}" - ) - # force index 0 - # accum_message.tool_calls[0].function.name = tool_call_delta.function.name - else: - accum_message.tool_calls[tool_call_delta.index].function.name = tool_call_delta.function.name - if tool_call_delta.function.arguments is not None: - if tool_call_delta.index not in range(len(accum_message.tool_calls)): - warnings.warn( - f"Tool call index out of range ({tool_call_delta.index})\ncurrent tool calls: {accum_message.tool_calls}\ncurrent delta: {tool_call_delta}" - ) - # force index 0 - # accum_message.tool_calls[0].function.arguments += tool_call_delta.function.arguments - else: - accum_message.tool_calls[tool_call_delta.index].function.arguments += tool_call_delta.function.arguments - - if message_delta.function_call is not None: - raise NotImplementedError("Old function_call style not support with stream=True") - - # overwrite response fields based on latest chunk - if not create_message_id: - chat_completion_response.id = chat_completion_chunk.id - if not create_message_datetime: - chat_completion_response.created = chat_completion_chunk.created - chat_completion_response.model = chat_completion_chunk.model - chat_completion_response.system_fingerprint = chat_completion_chunk.system_fingerprint - - # increment chunk counter - if chat_completion_chunk.output_tokens is not None: - completion_tokens += chat_completion_chunk.output_tokens - - except Exception as e: - if stream_interface: - stream_interface.stream_end() - print(f"Parsing ChatCompletion stream failed with error:\n{str(e)}") - raise e - finally: - if stream_interface: - stream_interface.stream_end() - - # make sure we didn't leave temp stuff in - assert all([c.finish_reason != TEMP_STREAM_FINISH_REASON for c in chat_completion_response.choices]) - assert all( - [ - all([tc.id != TEMP_STREAM_TOOL_CALL_ID for tc in c.message.tool_calls]) if c.message.tool_calls else True - for c in chat_completion_response.choices - ] - ) - if not create_message_id: - assert chat_completion_response.id != dummy_message.id - - # compute token usage before returning - # TODO try actually computing the #tokens instead of assuming the chunks is the same - chat_completion_response.usage.completion_tokens = completion_tokens - chat_completion_response.usage.total_tokens = prompt_tokens + completion_tokens - - assert len(chat_completion_response.choices) > 0, chat_completion_response - - log_event(name="llm_response_received", attributes=chat_completion_response.model_dump()) - - for choice in chat_completion_response.choices: - if choice.message.content is not None: - choice.message.content = choice.message.content.replace(f"<{inner_thoughts_xml_tag}>", "") - choice.message.content = choice.message.content.replace(f"", "") - - return chat_completion_response diff --git a/letta/llm_api/llm_api_tools.py b/letta/llm_api/llm_api_tools.py index a511c722..99974551 100644 --- a/letta/llm_api/llm_api_tools.py +++ b/letta/llm_api/llm_api_tools.py @@ -7,11 +7,7 @@ import requests from letta.constants import CLI_WARNING_PREFIX from letta.errors import LettaConfigurationError, RateLimitExceededError -from letta.llm_api.anthropic import ( - anthropic_bedrock_chat_completions_request, - anthropic_chat_completions_process_stream, - anthropic_chat_completions_request, -) +from letta.llm_api.anthropic import anthropic_bedrock_chat_completions_request from letta.llm_api.aws_bedrock import has_valid_aws_credentials from letta.llm_api.deepseek import build_deepseek_chat_completions_request, convert_deepseek_response_to_chatcompletion from letta.llm_api.helpers import add_inner_thoughts_to_functions, unpack_all_inner_thoughts_from_kwargs @@ -311,89 +307,6 @@ def create( return response - elif llm_config.model_endpoint_type == "anthropic": - if not use_tool_naming: - raise NotImplementedError("Only tool calling supported on Anthropic API requests") - - if llm_config.enable_reasoner: - llm_config.put_inner_thoughts_in_kwargs = False - - # Force tool calling - tool_call = None - if functions is None: - # Special case for summarization path - tools = None - tool_choice = None - elif force_tool_call is not None: - # tool_call = {"type": "function", "function": {"name": force_tool_call}} - tool_choice = {"type": "tool", "name": force_tool_call} - tools = [{"type": "function", "function": f} for f in functions if f["name"] == force_tool_call] - assert functions is not None - - # need to have this setting to be able to put inner thoughts in kwargs - llm_config.put_inner_thoughts_in_kwargs = True - else: - if llm_config.put_inner_thoughts_in_kwargs: - # tool_choice_type other than "auto" only plays nice if thinking goes inside the tool calls - tool_choice = {"type": "any", "disable_parallel_tool_use": True} - else: - tool_choice = {"type": "auto", "disable_parallel_tool_use": True} - tools = [{"type": "function", "function": f} for f in functions] if functions is not None else None - - chat_completion_request = ChatCompletionRequest( - model=llm_config.model, - messages=[cast_message_to_subtype(m.to_openai_dict()) for m in messages], - tools=tools, - tool_choice=tool_choice, - max_tokens=llm_config.max_tokens, # Note: max_tokens is required for Anthropic API - temperature=llm_config.temperature, - stream=stream, - ) - - # Handle streaming - if stream: # Client requested token streaming - assert isinstance(stream_interface, (AgentChunkStreamingInterface, AgentRefreshStreamingInterface)), type(stream_interface) - - stream_interface.inner_thoughts_in_kwargs = True - response = anthropic_chat_completions_process_stream( - chat_completion_request=chat_completion_request, - put_inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs, - stream_interface=stream_interface, - extended_thinking=llm_config.enable_reasoner, - max_reasoning_tokens=llm_config.max_reasoning_tokens, - provider_name=llm_config.provider_name, - provider_category=llm_config.provider_category, - name=name, - user_id=user_id, - ) - - else: - # Client did not request token streaming (expect a blocking backend response) - response = anthropic_chat_completions_request( - data=chat_completion_request, - put_inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs, - extended_thinking=llm_config.enable_reasoner, - max_reasoning_tokens=llm_config.max_reasoning_tokens, - provider_name=llm_config.provider_name, - provider_category=llm_config.provider_category, - user_id=user_id, - ) - - if llm_config.put_inner_thoughts_in_kwargs: - response = unpack_all_inner_thoughts_from_kwargs(response=response, inner_thoughts_key=INNER_THOUGHTS_KWARG) - - telemetry_manager.create_provider_trace( - actor=actor, - provider_trace_create=ProviderTraceCreate( - request_json=chat_completion_request.model_json_schema(), - response_json=response.model_json_schema(), - step_id=step_id, - organization_id=actor.organization_id, - ), - ) - - return response - # elif llm_config.model_endpoint_type == "cohere": # if stream: # raise NotImplementedError(f"Streaming not yet implemented for {llm_config.model_endpoint_type}")