chore: clean up legacy anthropic path (#3905)
This commit is contained in:
@@ -20,16 +20,14 @@ from anthropic.types.beta import (
|
||||
)
|
||||
|
||||
from letta.errors import BedrockError, BedrockPermissionError, ErrorCode, LLMAuthenticationError, LLMError
|
||||
from letta.helpers.datetime_helpers import get_utc_time_int, timestamp_to_datetime
|
||||
from letta.helpers.datetime_helpers import get_utc_time_int
|
||||
from letta.llm_api.aws_bedrock import get_bedrock_client
|
||||
from letta.llm_api.helpers import add_inner_thoughts_to_functions
|
||||
from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION
|
||||
from letta.local_llm.utils import num_tokens_from_functions, num_tokens_from_messages
|
||||
from letta.log import get_logger
|
||||
from letta.otel.tracing import log_event
|
||||
from letta.schemas.enums import ProviderCategory
|
||||
from letta.schemas.message import Message as _Message
|
||||
from letta.schemas.message import MessageRole as _MessageRole
|
||||
from letta.schemas.openai.chat_completion_request import ChatCompletionRequest, Tool
|
||||
from letta.schemas.openai.chat_completion_response import (
|
||||
ChatCompletionChunkResponse,
|
||||
@@ -39,13 +37,11 @@ from letta.schemas.openai.chat_completion_response import (
|
||||
FunctionCall,
|
||||
FunctionCallDelta,
|
||||
)
|
||||
from letta.schemas.openai.chat_completion_response import Message
|
||||
from letta.schemas.openai.chat_completion_response import Message as ChoiceMessage
|
||||
from letta.schemas.openai.chat_completion_response import MessageDelta, ToolCall, ToolCallDelta, UsageStatistics
|
||||
from letta.services.provider_manager import ProviderManager
|
||||
from letta.services.user_manager import UserManager
|
||||
from letta.settings import model_settings
|
||||
from letta.streaming_interface import AgentChunkStreamingInterface, AgentRefreshStreamingInterface
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -786,43 +782,6 @@ def _prepare_anthropic_request(
|
||||
return data
|
||||
|
||||
|
||||
def anthropic_chat_completions_request(
|
||||
data: ChatCompletionRequest,
|
||||
inner_thoughts_xml_tag: Optional[str] = "thinking",
|
||||
put_inner_thoughts_in_kwargs: bool = False,
|
||||
extended_thinking: bool = False,
|
||||
max_reasoning_tokens: Optional[int] = None,
|
||||
provider_name: Optional[str] = None,
|
||||
provider_category: Optional[ProviderCategory] = None,
|
||||
betas: List[str] = ["tools-2024-04-04"],
|
||||
user_id: Optional[str] = None,
|
||||
) -> ChatCompletionResponse:
|
||||
"""https://docs.anthropic.com/claude/docs/tool-use"""
|
||||
anthropic_client = None
|
||||
if provider_category == ProviderCategory.byok:
|
||||
actor = UserManager().get_user_or_default(user_id=user_id)
|
||||
api_key = ProviderManager().get_override_key(provider_name, actor=actor)
|
||||
anthropic_client = anthropic.Anthropic(api_key=api_key)
|
||||
elif model_settings.anthropic_api_key:
|
||||
anthropic_client = anthropic.Anthropic()
|
||||
else:
|
||||
raise ValueError("No available Anthropic API key")
|
||||
data = _prepare_anthropic_request(
|
||||
data=data,
|
||||
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
|
||||
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
|
||||
extended_thinking=extended_thinking,
|
||||
max_reasoning_tokens=max_reasoning_tokens,
|
||||
)
|
||||
log_event(name="llm_request_sent", attributes=data)
|
||||
response = anthropic_client.beta.messages.create(
|
||||
**data,
|
||||
betas=betas,
|
||||
)
|
||||
log_event(name="llm_response_received", attributes={"response": response.json()})
|
||||
return convert_anthropic_response_to_chatcompletion(response=response, inner_thoughts_xml_tag=inner_thoughts_xml_tag)
|
||||
|
||||
|
||||
def anthropic_bedrock_chat_completions_request(
|
||||
data: ChatCompletionRequest,
|
||||
inner_thoughts_xml_tag: Optional[str] = "thinking",
|
||||
@@ -920,287 +879,3 @@ def anthropic_chat_completions_request_stream(
|
||||
message_id = chunk.message.id
|
||||
model = chunk.message.model
|
||||
yield convert_anthropic_stream_event_to_chatcompletion(chunk, message_id, model, inner_thoughts_xml_tag)
|
||||
|
||||
|
||||
def anthropic_chat_completions_process_stream(
|
||||
chat_completion_request: ChatCompletionRequest,
|
||||
stream_interface: Optional[Union[AgentChunkStreamingInterface, AgentRefreshStreamingInterface]] = None,
|
||||
inner_thoughts_xml_tag: Optional[str] = "thinking",
|
||||
put_inner_thoughts_in_kwargs: bool = False,
|
||||
extended_thinking: bool = False,
|
||||
max_reasoning_tokens: Optional[int] = None,
|
||||
provider_name: Optional[str] = None,
|
||||
provider_category: Optional[ProviderCategory] = None,
|
||||
create_message_id: bool = True,
|
||||
create_message_datetime: bool = True,
|
||||
betas: List[str] = ["tools-2024-04-04"],
|
||||
name: Optional[str] = None,
|
||||
user_id: Optional[str] = None,
|
||||
) -> ChatCompletionResponse:
|
||||
"""Process a streaming completion response from Anthropic, similar to OpenAI's streaming.
|
||||
|
||||
Args:
|
||||
api_key: The Anthropic API key
|
||||
chat_completion_request: The chat completion request
|
||||
stream_interface: Interface for handling streaming chunks
|
||||
inner_thoughts_xml_tag: Tag for inner thoughts in the response
|
||||
create_message_id: Whether to create a message ID
|
||||
create_message_datetime: Whether to create message datetime
|
||||
betas: Beta features to enable
|
||||
|
||||
Returns:
|
||||
The final ChatCompletionResponse
|
||||
"""
|
||||
assert chat_completion_request.stream == True
|
||||
assert stream_interface is not None, "Required"
|
||||
|
||||
# Count prompt tokens - we'll get completion tokens from the final response
|
||||
chat_history = [m.model_dump(exclude_none=True) for m in chat_completion_request.messages]
|
||||
prompt_tokens = num_tokens_from_messages(
|
||||
messages=chat_history,
|
||||
model=chat_completion_request.model,
|
||||
)
|
||||
|
||||
# Add tokens for tools if present
|
||||
if chat_completion_request.tools is not None:
|
||||
assert chat_completion_request.functions is None
|
||||
prompt_tokens += num_tokens_from_functions(
|
||||
functions=[t.function.model_dump() for t in chat_completion_request.tools],
|
||||
model=chat_completion_request.model,
|
||||
)
|
||||
elif chat_completion_request.functions is not None:
|
||||
assert chat_completion_request.tools is None
|
||||
prompt_tokens += num_tokens_from_functions(
|
||||
functions=[f.model_dump() for f in chat_completion_request.functions],
|
||||
model=chat_completion_request.model,
|
||||
)
|
||||
|
||||
# Create a dummy message for ID/datetime if needed
|
||||
dummy_message = _Message(
|
||||
role=_MessageRole.assistant,
|
||||
content=[],
|
||||
agent_id="",
|
||||
model="",
|
||||
name=None,
|
||||
tool_calls=None,
|
||||
tool_call_id=None,
|
||||
)
|
||||
|
||||
TEMP_STREAM_RESPONSE_ID = "temp_id"
|
||||
TEMP_STREAM_FINISH_REASON = "temp_null"
|
||||
TEMP_STREAM_TOOL_CALL_ID = "temp_id"
|
||||
chat_completion_response = ChatCompletionResponse(
|
||||
id=dummy_message.id if create_message_id else TEMP_STREAM_RESPONSE_ID,
|
||||
choices=[],
|
||||
created=int(dummy_message.created_at.timestamp()),
|
||||
model=chat_completion_request.model,
|
||||
usage=UsageStatistics(
|
||||
prompt_tokens=prompt_tokens,
|
||||
total_tokens=prompt_tokens,
|
||||
),
|
||||
)
|
||||
|
||||
log_event(name="llm_request_sent", attributes=chat_completion_request.model_dump())
|
||||
|
||||
if stream_interface:
|
||||
stream_interface.stream_start()
|
||||
|
||||
completion_tokens = 0
|
||||
prev_message_type = None
|
||||
message_idx = 0
|
||||
try:
|
||||
for chunk_idx, chat_completion_chunk in enumerate(
|
||||
anthropic_chat_completions_request_stream(
|
||||
data=chat_completion_request,
|
||||
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
|
||||
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
|
||||
extended_thinking=extended_thinking,
|
||||
max_reasoning_tokens=max_reasoning_tokens,
|
||||
provider_name=provider_name,
|
||||
provider_category=provider_category,
|
||||
betas=betas,
|
||||
user_id=user_id,
|
||||
)
|
||||
):
|
||||
assert isinstance(chat_completion_chunk, ChatCompletionChunkResponse), type(chat_completion_chunk)
|
||||
|
||||
if stream_interface:
|
||||
if isinstance(stream_interface, AgentChunkStreamingInterface):
|
||||
message_type = stream_interface.process_chunk(
|
||||
chat_completion_chunk,
|
||||
message_id=chat_completion_response.id if create_message_id else chat_completion_chunk.id,
|
||||
message_date=(
|
||||
timestamp_to_datetime(chat_completion_response.created)
|
||||
if create_message_datetime
|
||||
else timestamp_to_datetime(chat_completion_chunk.created)
|
||||
),
|
||||
# if extended_thinking is on, then reasoning_content will be flowing as chunks
|
||||
# TODO handle emitting redacted reasoning content (e.g. as concat?)
|
||||
expect_reasoning_content=extended_thinking,
|
||||
name=name,
|
||||
message_index=message_idx,
|
||||
prev_message_type=prev_message_type,
|
||||
)
|
||||
if message_type != prev_message_type and message_type is not None and prev_message_type is not None:
|
||||
message_idx += 1
|
||||
if message_type is not None:
|
||||
prev_message_type = message_type
|
||||
elif isinstance(stream_interface, AgentRefreshStreamingInterface):
|
||||
stream_interface.process_refresh(chat_completion_response)
|
||||
else:
|
||||
raise TypeError(stream_interface)
|
||||
|
||||
if chunk_idx == 0:
|
||||
# initialize the choice objects which we will increment with the deltas
|
||||
num_choices = len(chat_completion_chunk.choices)
|
||||
assert num_choices > 0
|
||||
chat_completion_response.choices = [
|
||||
Choice(
|
||||
finish_reason=TEMP_STREAM_FINISH_REASON, # NOTE: needs to be ovrerwritten
|
||||
index=i,
|
||||
message=Message(
|
||||
role="assistant",
|
||||
),
|
||||
)
|
||||
for i in range(len(chat_completion_chunk.choices))
|
||||
]
|
||||
|
||||
# add the choice delta
|
||||
assert len(chat_completion_chunk.choices) == len(chat_completion_response.choices), chat_completion_chunk
|
||||
for chunk_choice in chat_completion_chunk.choices:
|
||||
if chunk_choice.finish_reason is not None:
|
||||
chat_completion_response.choices[chunk_choice.index].finish_reason = chunk_choice.finish_reason
|
||||
|
||||
if chunk_choice.logprobs is not None:
|
||||
chat_completion_response.choices[chunk_choice.index].logprobs = chunk_choice.logprobs
|
||||
|
||||
accum_message = chat_completion_response.choices[chunk_choice.index].message
|
||||
message_delta = chunk_choice.delta
|
||||
|
||||
if message_delta.content is not None:
|
||||
content_delta = message_delta.content
|
||||
if accum_message.content is None:
|
||||
accum_message.content = content_delta
|
||||
else:
|
||||
accum_message.content += content_delta
|
||||
|
||||
# NOTE: for extended_thinking mode
|
||||
if extended_thinking and message_delta.reasoning_content is not None:
|
||||
reasoning_content_delta = message_delta.reasoning_content
|
||||
if accum_message.reasoning_content is None:
|
||||
accum_message.reasoning_content = reasoning_content_delta
|
||||
else:
|
||||
accum_message.reasoning_content += reasoning_content_delta
|
||||
|
||||
# NOTE: extended_thinking sends a signature
|
||||
if extended_thinking and message_delta.reasoning_content_signature is not None:
|
||||
reasoning_content_signature_delta = message_delta.reasoning_content_signature
|
||||
if accum_message.reasoning_content_signature is None:
|
||||
accum_message.reasoning_content_signature = reasoning_content_signature_delta
|
||||
else:
|
||||
accum_message.reasoning_content_signature += reasoning_content_signature_delta
|
||||
|
||||
# NOTE: extended_thinking also has the potential for redacted_reasoning_content
|
||||
if extended_thinking and message_delta.redacted_reasoning_content is not None:
|
||||
redacted_reasoning_content_delta = message_delta.redacted_reasoning_content
|
||||
if accum_message.redacted_reasoning_content is None:
|
||||
accum_message.redacted_reasoning_content = redacted_reasoning_content_delta
|
||||
else:
|
||||
accum_message.redacted_reasoning_content += redacted_reasoning_content_delta
|
||||
|
||||
# TODO(charles) make sure this works for parallel tool calling?
|
||||
if message_delta.tool_calls is not None:
|
||||
tool_calls_delta = message_delta.tool_calls
|
||||
|
||||
# If this is the first tool call showing up in a chunk, initialize the list with it
|
||||
if accum_message.tool_calls is None:
|
||||
accum_message.tool_calls = [
|
||||
ToolCall(id=TEMP_STREAM_TOOL_CALL_ID, function=FunctionCall(name="", arguments=""))
|
||||
for _ in range(len(tool_calls_delta))
|
||||
]
|
||||
|
||||
# There may be many tool calls in a tool calls delta (e.g. parallel tool calls)
|
||||
for tool_call_delta in tool_calls_delta:
|
||||
if tool_call_delta.id is not None:
|
||||
# TODO assert that we're not overwriting?
|
||||
# TODO += instead of =?
|
||||
if tool_call_delta.index not in range(len(accum_message.tool_calls)):
|
||||
warnings.warn(
|
||||
f"Tool call index out of range ({tool_call_delta.index})\ncurrent tool calls: {accum_message.tool_calls}\ncurrent delta: {tool_call_delta}"
|
||||
)
|
||||
# force index 0
|
||||
# accum_message.tool_calls[0].id = tool_call_delta.id
|
||||
else:
|
||||
accum_message.tool_calls[tool_call_delta.index].id = tool_call_delta.id
|
||||
if tool_call_delta.function is not None:
|
||||
if tool_call_delta.function.name is not None:
|
||||
# TODO assert that we're not overwriting?
|
||||
# TODO += instead of =?
|
||||
if tool_call_delta.index not in range(len(accum_message.tool_calls)):
|
||||
warnings.warn(
|
||||
f"Tool call index out of range ({tool_call_delta.index})\ncurrent tool calls: {accum_message.tool_calls}\ncurrent delta: {tool_call_delta}"
|
||||
)
|
||||
# force index 0
|
||||
# accum_message.tool_calls[0].function.name = tool_call_delta.function.name
|
||||
else:
|
||||
accum_message.tool_calls[tool_call_delta.index].function.name = tool_call_delta.function.name
|
||||
if tool_call_delta.function.arguments is not None:
|
||||
if tool_call_delta.index not in range(len(accum_message.tool_calls)):
|
||||
warnings.warn(
|
||||
f"Tool call index out of range ({tool_call_delta.index})\ncurrent tool calls: {accum_message.tool_calls}\ncurrent delta: {tool_call_delta}"
|
||||
)
|
||||
# force index 0
|
||||
# accum_message.tool_calls[0].function.arguments += tool_call_delta.function.arguments
|
||||
else:
|
||||
accum_message.tool_calls[tool_call_delta.index].function.arguments += tool_call_delta.function.arguments
|
||||
|
||||
if message_delta.function_call is not None:
|
||||
raise NotImplementedError("Old function_call style not support with stream=True")
|
||||
|
||||
# overwrite response fields based on latest chunk
|
||||
if not create_message_id:
|
||||
chat_completion_response.id = chat_completion_chunk.id
|
||||
if not create_message_datetime:
|
||||
chat_completion_response.created = chat_completion_chunk.created
|
||||
chat_completion_response.model = chat_completion_chunk.model
|
||||
chat_completion_response.system_fingerprint = chat_completion_chunk.system_fingerprint
|
||||
|
||||
# increment chunk counter
|
||||
if chat_completion_chunk.output_tokens is not None:
|
||||
completion_tokens += chat_completion_chunk.output_tokens
|
||||
|
||||
except Exception as e:
|
||||
if stream_interface:
|
||||
stream_interface.stream_end()
|
||||
print(f"Parsing ChatCompletion stream failed with error:\n{str(e)}")
|
||||
raise e
|
||||
finally:
|
||||
if stream_interface:
|
||||
stream_interface.stream_end()
|
||||
|
||||
# make sure we didn't leave temp stuff in
|
||||
assert all([c.finish_reason != TEMP_STREAM_FINISH_REASON for c in chat_completion_response.choices])
|
||||
assert all(
|
||||
[
|
||||
all([tc.id != TEMP_STREAM_TOOL_CALL_ID for tc in c.message.tool_calls]) if c.message.tool_calls else True
|
||||
for c in chat_completion_response.choices
|
||||
]
|
||||
)
|
||||
if not create_message_id:
|
||||
assert chat_completion_response.id != dummy_message.id
|
||||
|
||||
# compute token usage before returning
|
||||
# TODO try actually computing the #tokens instead of assuming the chunks is the same
|
||||
chat_completion_response.usage.completion_tokens = completion_tokens
|
||||
chat_completion_response.usage.total_tokens = prompt_tokens + completion_tokens
|
||||
|
||||
assert len(chat_completion_response.choices) > 0, chat_completion_response
|
||||
|
||||
log_event(name="llm_response_received", attributes=chat_completion_response.model_dump())
|
||||
|
||||
for choice in chat_completion_response.choices:
|
||||
if choice.message.content is not None:
|
||||
choice.message.content = choice.message.content.replace(f"<{inner_thoughts_xml_tag}>", "")
|
||||
choice.message.content = choice.message.content.replace(f"</{inner_thoughts_xml_tag}>", "")
|
||||
|
||||
return chat_completion_response
|
||||
|
||||
@@ -7,11 +7,7 @@ import requests
|
||||
|
||||
from letta.constants import CLI_WARNING_PREFIX
|
||||
from letta.errors import LettaConfigurationError, RateLimitExceededError
|
||||
from letta.llm_api.anthropic import (
|
||||
anthropic_bedrock_chat_completions_request,
|
||||
anthropic_chat_completions_process_stream,
|
||||
anthropic_chat_completions_request,
|
||||
)
|
||||
from letta.llm_api.anthropic import anthropic_bedrock_chat_completions_request
|
||||
from letta.llm_api.aws_bedrock import has_valid_aws_credentials
|
||||
from letta.llm_api.deepseek import build_deepseek_chat_completions_request, convert_deepseek_response_to_chatcompletion
|
||||
from letta.llm_api.helpers import add_inner_thoughts_to_functions, unpack_all_inner_thoughts_from_kwargs
|
||||
@@ -311,89 +307,6 @@ def create(
|
||||
|
||||
return response
|
||||
|
||||
elif llm_config.model_endpoint_type == "anthropic":
|
||||
if not use_tool_naming:
|
||||
raise NotImplementedError("Only tool calling supported on Anthropic API requests")
|
||||
|
||||
if llm_config.enable_reasoner:
|
||||
llm_config.put_inner_thoughts_in_kwargs = False
|
||||
|
||||
# Force tool calling
|
||||
tool_call = None
|
||||
if functions is None:
|
||||
# Special case for summarization path
|
||||
tools = None
|
||||
tool_choice = None
|
||||
elif force_tool_call is not None:
|
||||
# tool_call = {"type": "function", "function": {"name": force_tool_call}}
|
||||
tool_choice = {"type": "tool", "name": force_tool_call}
|
||||
tools = [{"type": "function", "function": f} for f in functions if f["name"] == force_tool_call]
|
||||
assert functions is not None
|
||||
|
||||
# need to have this setting to be able to put inner thoughts in kwargs
|
||||
llm_config.put_inner_thoughts_in_kwargs = True
|
||||
else:
|
||||
if llm_config.put_inner_thoughts_in_kwargs:
|
||||
# tool_choice_type other than "auto" only plays nice if thinking goes inside the tool calls
|
||||
tool_choice = {"type": "any", "disable_parallel_tool_use": True}
|
||||
else:
|
||||
tool_choice = {"type": "auto", "disable_parallel_tool_use": True}
|
||||
tools = [{"type": "function", "function": f} for f in functions] if functions is not None else None
|
||||
|
||||
chat_completion_request = ChatCompletionRequest(
|
||||
model=llm_config.model,
|
||||
messages=[cast_message_to_subtype(m.to_openai_dict()) for m in messages],
|
||||
tools=tools,
|
||||
tool_choice=tool_choice,
|
||||
max_tokens=llm_config.max_tokens, # Note: max_tokens is required for Anthropic API
|
||||
temperature=llm_config.temperature,
|
||||
stream=stream,
|
||||
)
|
||||
|
||||
# Handle streaming
|
||||
if stream: # Client requested token streaming
|
||||
assert isinstance(stream_interface, (AgentChunkStreamingInterface, AgentRefreshStreamingInterface)), type(stream_interface)
|
||||
|
||||
stream_interface.inner_thoughts_in_kwargs = True
|
||||
response = anthropic_chat_completions_process_stream(
|
||||
chat_completion_request=chat_completion_request,
|
||||
put_inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs,
|
||||
stream_interface=stream_interface,
|
||||
extended_thinking=llm_config.enable_reasoner,
|
||||
max_reasoning_tokens=llm_config.max_reasoning_tokens,
|
||||
provider_name=llm_config.provider_name,
|
||||
provider_category=llm_config.provider_category,
|
||||
name=name,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
else:
|
||||
# Client did not request token streaming (expect a blocking backend response)
|
||||
response = anthropic_chat_completions_request(
|
||||
data=chat_completion_request,
|
||||
put_inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs,
|
||||
extended_thinking=llm_config.enable_reasoner,
|
||||
max_reasoning_tokens=llm_config.max_reasoning_tokens,
|
||||
provider_name=llm_config.provider_name,
|
||||
provider_category=llm_config.provider_category,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
if llm_config.put_inner_thoughts_in_kwargs:
|
||||
response = unpack_all_inner_thoughts_from_kwargs(response=response, inner_thoughts_key=INNER_THOUGHTS_KWARG)
|
||||
|
||||
telemetry_manager.create_provider_trace(
|
||||
actor=actor,
|
||||
provider_trace_create=ProviderTraceCreate(
|
||||
request_json=chat_completion_request.model_json_schema(),
|
||||
response_json=response.model_json_schema(),
|
||||
step_id=step_id,
|
||||
organization_id=actor.organization_id,
|
||||
),
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
# elif llm_config.model_endpoint_type == "cohere":
|
||||
# if stream:
|
||||
# raise NotImplementedError(f"Streaming not yet implemented for {llm_config.model_endpoint_type}")
|
||||
|
||||
Reference in New Issue
Block a user