Add 'apps/core/' from commit 'ea2a7395f4023f5b9fab03e6273db3b64a1181d5'

git-subtree-dir: apps/core
git-subtree-mainline: a8963e11e7a5a0059acbc849ce768e1eee80df61
git-subtree-split: ea2a7395f4023f5b9fab03e6273db3b64a1181d5
This commit is contained in:
Shubham Naik
2024-12-22 20:31:22 -08:00
commit 5a743d1dc4
478 changed files with 65642 additions and 0 deletions

View File

389
letta/llm_api/anthropic.py Normal file
View File

@@ -0,0 +1,389 @@
import json
import re
from typing import List, Optional, Union
from letta.llm_api.helpers import make_post_request
from letta.schemas.message import Message
from letta.schemas.openai.chat_completion_request import ChatCompletionRequest, Tool
from letta.schemas.openai.chat_completion_response import (
ChatCompletionResponse,
Choice,
FunctionCall,
)
from letta.schemas.openai.chat_completion_response import (
Message as ChoiceMessage, # NOTE: avoid conflict with our own Letta Message datatype
)
from letta.schemas.openai.chat_completion_response import ToolCall, UsageStatistics
from letta.utils import get_utc_time, smart_urljoin
BASE_URL = "https://api.anthropic.com/v1"
# https://docs.anthropic.com/claude/docs/models-overview
# Sadly hardcoded
MODEL_LIST = [
{
"name": "claude-3-opus-20240229",
"context_window": 200000,
},
{
"name": "claude-3-5-sonnet-20241022",
"context_window": 200000,
},
{
"name": "claude-3-5-haiku-20241022",
"context_window": 200000,
},
]
DUMMY_FIRST_USER_MESSAGE = "User initializing bootup sequence."
def antropic_get_model_context_window(url: str, api_key: Union[str, None], model: str) -> int:
for model_dict in anthropic_get_model_list(url=url, api_key=api_key):
if model_dict["name"] == model:
return model_dict["context_window"]
raise ValueError(f"Can't find model '{model}' in Anthropic model list")
def anthropic_get_model_list(url: str, api_key: Union[str, None]) -> dict:
"""https://docs.anthropic.com/claude/docs/models-overview"""
# NOTE: currently there is no GET /models, so we need to hardcode
return MODEL_LIST
def convert_tools_to_anthropic_format(tools: List[Tool]) -> List[dict]:
"""See: https://docs.anthropic.com/claude/docs/tool-use
OpenAI style:
"tools": [{
"type": "function",
"function": {
"name": "find_movies",
"description": "find ....",
"parameters": {
"type": "object",
"properties": {
PARAM: {
"type": PARAM_TYPE, # eg "string"
"description": PARAM_DESCRIPTION,
},
...
},
"required": List[str],
}
}
}
]
Anthropic style:
"tools": [{
"name": "find_movies",
"description": "find ....",
"input_schema": {
"type": "object",
"properties": {
PARAM: {
"type": PARAM_TYPE, # eg "string"
"description": PARAM_DESCRIPTION,
},
...
},
"required": List[str],
}
}
]
Two small differences:
- 1 level less of nesting
- "parameters" -> "input_schema"
"""
formatted_tools = []
for tool in tools:
formatted_tool = {
"name" : tool.function.name,
"description" : tool.function.description,
"input_schema" : tool.function.parameters or {
"type": "object",
"properties": {},
"required": []
}
}
formatted_tools.append(formatted_tool)
return formatted_tools
def merge_tool_results_into_user_messages(messages: List[dict]):
"""Anthropic API doesn't allow role 'tool'->'user' sequences
Example HTTP error:
messages: roles must alternate between "user" and "assistant", but found multiple "user" roles in a row
From: https://docs.anthropic.com/claude/docs/tool-use
You may be familiar with other APIs that return tool use as separate from the model's primary output,
or which use a special-purpose tool or function message role.
In contrast, Anthropic's models and API are built around alternating user and assistant messages,
where each message is an array of rich content blocks: text, image, tool_use, and tool_result.
"""
# TODO walk through the messages list
# When a dict (dict_A) with 'role' == 'user' is followed by a dict with 'role' == 'user' (dict B), do the following
# dict_A["content"] = dict_A["content"] + dict_B["content"]
# The result should be a new merged_messages list that doesn't have any back-to-back dicts with 'role' == 'user'
merged_messages = []
if not messages:
return merged_messages
# Start with the first message in the list
current_message = messages[0]
for next_message in messages[1:]:
if current_message["role"] == "user" and next_message["role"] == "user":
# Merge contents of the next user message into current one
current_content = (
current_message["content"]
if isinstance(current_message["content"], list)
else [{"type": "text", "text": current_message["content"]}]
)
next_content = (
next_message["content"]
if isinstance(next_message["content"], list)
else [{"type": "text", "text": next_message["content"]}]
)
merged_content = current_content + next_content
current_message["content"] = merged_content
else:
# Append the current message to result as it's complete
merged_messages.append(current_message)
# Move on to the next message
current_message = next_message
# Append the last processed message to the result
merged_messages.append(current_message)
return merged_messages
def remap_finish_reason(stop_reason: str) -> str:
"""Remap Anthropic's 'stop_reason' to OpenAI 'finish_reason'
OpenAI: 'stop', 'length', 'function_call', 'content_filter', null
see: https://platform.openai.com/docs/guides/text-generation/chat-completions-api
From: https://docs.anthropic.com/claude/reference/migrating-from-text-completions-to-messages#stop-reason
Messages have a stop_reason of one of the following values:
"end_turn": The conversational turn ended naturally.
"stop_sequence": One of your specified custom stop sequences was generated.
"max_tokens": (unchanged)
"""
if stop_reason == "end_turn":
return "stop"
elif stop_reason == "stop_sequence":
return "stop"
elif stop_reason == "max_tokens":
return "length"
elif stop_reason == "tool_use":
return "function_call"
else:
raise ValueError(f"Unexpected stop_reason: {stop_reason}")
def strip_xml_tags(string: str, tag: Optional[str]) -> str:
if tag is None:
return string
# Construct the regular expression pattern to find the start and end tags
tag_pattern = f"<{tag}.*?>|</{tag}>"
# Use the regular expression to replace the tags with an empty string
return re.sub(tag_pattern, "", string)
def convert_anthropic_response_to_chatcompletion(
response_json: dict, # REST response from Google AI API
inner_thoughts_xml_tag: Optional[str] = None,
) -> ChatCompletionResponse:
"""
Example response from Claude 3:
response.json = {
'id': 'msg_01W1xg9hdRzbeN2CfZM7zD2w',
'type': 'message',
'role': 'assistant',
'content': [
{
'type': 'text',
'text': "<thinking>Analyzing user login event. This is Chad's first
interaction with me. I will adjust my personality and rapport accordingly.</thinking>"
},
{
'type':
'tool_use',
'id': 'toolu_01Ka4AuCmfvxiidnBZuNfP1u',
'name': 'core_memory_append',
'input': {
'name': 'human',
'content': 'Chad is logging in for the first time. I will aim to build a warm
and welcoming rapport.',
'request_heartbeat': True
}
}
],
'model': 'claude-3-haiku-20240307',
'stop_reason': 'tool_use',
'stop_sequence': None,
'usage': {
'input_tokens': 3305,
'output_tokens': 141
}
}
"""
prompt_tokens = response_json["usage"]["input_tokens"]
completion_tokens = response_json["usage"]["output_tokens"]
finish_reason = remap_finish_reason(response_json["stop_reason"])
if isinstance(response_json["content"], list):
if len(response_json["content"]) > 1:
# inner mono + function call
assert len(response_json["content"]) == 2, response_json
assert response_json["content"][0]["type"] == "text", response_json
assert response_json["content"][1]["type"] == "tool_use", response_json
content = strip_xml_tags(string=response_json["content"][0]["text"], tag=inner_thoughts_xml_tag)
tool_calls = [
ToolCall(
id=response_json["content"][1]["id"],
type="function",
function=FunctionCall(
name=response_json["content"][1]["name"],
arguments=json.dumps(response_json["content"][1]["input"], indent=2),
),
)
]
elif len(response_json["content"]) == 1:
if response_json["content"][0]["type"] == "tool_use":
# function call only
content = None
tool_calls = [
ToolCall(
id=response_json["content"][0]["id"],
type="function",
function=FunctionCall(
name=response_json["content"][0]["name"],
arguments=json.dumps(response_json["content"][0]["input"], indent=2),
),
)
]
else:
# inner mono only
content = strip_xml_tags(string=response_json["content"][0]["text"], tag=inner_thoughts_xml_tag)
tool_calls = None
else:
raise RuntimeError("Unexpected type for content in response_json.")
assert response_json["role"] == "assistant", response_json
choice = Choice(
index=0,
finish_reason=finish_reason,
message=ChoiceMessage(
role=response_json["role"],
content=content,
tool_calls=tool_calls,
),
)
return ChatCompletionResponse(
id=response_json["id"],
choices=[choice],
created=get_utc_time(),
model=response_json["model"],
usage=UsageStatistics(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
),
)
def anthropic_chat_completions_request(
url: str,
api_key: str,
data: ChatCompletionRequest,
inner_thoughts_xml_tag: Optional[str] = "thinking",
) -> ChatCompletionResponse:
"""https://docs.anthropic.com/claude/docs/tool-use"""
url = smart_urljoin(url, "messages")
headers = {
"Content-Type": "application/json",
"x-api-key": api_key,
# NOTE: beta headers for tool calling
"anthropic-version": "2023-06-01",
"anthropic-beta": "tools-2024-04-04",
}
# convert the tools
anthropic_tools = None if data.tools is None else convert_tools_to_anthropic_format(data.tools)
# pydantic -> dict
data = data.model_dump(exclude_none=True)
if "functions" in data:
raise ValueError(f"'functions' unexpected in Anthropic API payload")
# If tools == None, strip from the payload
if "tools" in data and data["tools"] is None:
data.pop("tools")
data.pop("tool_choice", None) # extra safe, should exist always (default="auto")
# Remap to our converted tools
if anthropic_tools is not None:
data["tools"] = anthropic_tools
# TODO: Add support for other tool_choice options like "auto", "any"
if len(anthropic_tools) == 1:
data["tool_choice"] = {
"type": "tool", # Changed from "function" to "tool"
"name": anthropic_tools[0]["name"], # Directly specify name without nested "function" object
"disable_parallel_tool_use": True # Force single tool use
}
# Move 'system' to the top level
# 'messages: Unexpected role "system". The Messages API accepts a top-level `system` parameter, not "system" as an input message role.'
assert data["messages"][0]["role"] == "system", f"Expected 'system' role in messages[0]:\n{data['messages'][0]}"
data["system"] = data["messages"][0]["content"]
data["messages"] = data["messages"][1:]
# set `content` to None if missing
for message in data["messages"]:
if "content" not in message:
message["content"] = None
# Convert to Anthropic format
msg_objs = [Message.dict_to_message(user_id=None, agent_id=None, openai_message_dict=m) for m in data["messages"]]
data["messages"] = [m.to_anthropic_dict(inner_thoughts_xml_tag=inner_thoughts_xml_tag) for m in msg_objs]
# Handling Anthropic special requirement for 'user' message in front
# messages: first message must use the "user" role'
if data["messages"][0]["role"] != "user":
data["messages"] = [{"role": "user", "content": DUMMY_FIRST_USER_MESSAGE}] + data["messages"]
# Handle Anthropic's restriction on alternating user/assistant messages
data["messages"] = merge_tool_results_into_user_messages(data["messages"])
# Anthropic also wants max_tokens in the input
# It's also part of ChatCompletions
assert "max_tokens" in data, data
# Remove extra fields used by OpenAI but not Anthropic
data.pop("frequency_penalty", None)
data.pop("logprobs", None)
data.pop("n", None)
data.pop("top_p", None)
data.pop("presence_penalty", None)
data.pop("user", None)
response_json = make_post_request(url, headers, data)
return convert_anthropic_response_to_chatcompletion(response_json=response_json, inner_thoughts_xml_tag=inner_thoughts_xml_tag)

View File

@@ -0,0 +1,140 @@
from collections import defaultdict
import requests
from letta.llm_api.helpers import make_post_request
from letta.schemas.llm_config import LLMConfig
from letta.schemas.openai.chat_completion_response import ChatCompletionResponse
from letta.schemas.openai.chat_completions import ChatCompletionRequest
from letta.schemas.openai.embedding_response import EmbeddingResponse
from letta.settings import ModelSettings
def get_azure_chat_completions_endpoint(base_url: str, model: str, api_version: str):
return f"{base_url}/openai/deployments/{model}/chat/completions?api-version={api_version}"
def get_azure_embeddings_endpoint(base_url: str, model: str, api_version: str):
return f"{base_url}/openai/deployments/{model}/embeddings?api-version={api_version}"
def get_azure_model_list_endpoint(base_url: str, api_version: str):
return f"{base_url}/openai/models?api-version={api_version}"
def get_azure_deployment_list_endpoint(base_url: str):
# Please note that it has to be 2023-03-15-preview
# That's the only api version that works with this deployments endpoint
# TODO: Use the Azure Client library here instead
return f"{base_url}/openai/deployments?api-version=2023-03-15-preview"
def azure_openai_get_deployed_model_list(base_url: str, api_key: str, api_version: str) -> list:
"""https://learn.microsoft.com/en-us/rest/api/azureopenai/models/list?view=rest-azureopenai-2023-05-15&tabs=HTTP"""
# https://xxx.openai.azure.com/openai/models?api-version=xxx
headers = {"Content-Type": "application/json"}
if api_key is not None:
headers["api-key"] = f"{api_key}"
# 1. Get all available models
url = get_azure_model_list_endpoint(base_url, api_version)
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Failed to retrieve model list: {e}")
all_available_models = response.json().get("data", [])
# 2. Get all the deployed models
url = get_azure_deployment_list_endpoint(base_url)
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Failed to retrieve model list: {e}")
deployed_models = response.json().get("data", [])
deployed_model_names = set([m["id"] for m in deployed_models])
# 3. Only return the models in available models if they have been deployed
deployed_models = [m for m in all_available_models if m["id"] in deployed_model_names]
# 4. Remove redundant deployments, only include the ones with the latest deployment
# Create a dictionary to store the latest model for each ID
latest_models = defaultdict()
# Iterate through the models and update the dictionary with the most recent model
for model in deployed_models:
model_id = model["id"]
updated_at = model["created_at"]
# If the model ID is new or the current model has a more recent created_at, update the dictionary
if model_id not in latest_models or updated_at > latest_models[model_id]["created_at"]:
latest_models[model_id] = model
# Extract the unique models
return list(latest_models.values())
def azure_openai_get_chat_completion_model_list(base_url: str, api_key: str, api_version: str) -> list:
model_list = azure_openai_get_deployed_model_list(base_url, api_key, api_version)
# Extract models that support text generation
model_options = [m for m in model_list if m.get("capabilities").get("chat_completion") == True]
return model_options
def azure_openai_get_embeddings_model_list(base_url: str, api_key: str, api_version: str, require_embedding_in_name: bool = True) -> list:
def valid_embedding_model(m: dict):
valid_name = True
if require_embedding_in_name:
valid_name = "embedding" in m["id"]
return m.get("capabilities").get("embeddings") == True and valid_name
model_list = azure_openai_get_deployed_model_list(base_url, api_key, api_version)
# Extract models that support embeddings
model_options = [m for m in model_list if valid_embedding_model(m)]
return model_options
def azure_openai_chat_completions_request(
model_settings: ModelSettings, llm_config: LLMConfig, api_key: str, chat_completion_request: ChatCompletionRequest
) -> ChatCompletionResponse:
"""https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#chat-completions"""
assert api_key is not None, "Missing required field when calling Azure OpenAI"
headers = {"Content-Type": "application/json", "api-key": f"{api_key}"}
data = chat_completion_request.model_dump(exclude_none=True)
# If functions == None, strip from the payload
if "functions" in data and data["functions"] is None:
data.pop("functions")
data.pop("function_call", None) # extra safe, should exist always (default="auto")
if "tools" in data and data["tools"] is None:
data.pop("tools")
data.pop("tool_choice", None) # extra safe, should exist always (default="auto")
url = get_azure_chat_completions_endpoint(model_settings.azure_base_url, llm_config.model, model_settings.azure_api_version)
response_json = make_post_request(url, headers, data)
# NOTE: azure openai does not include "content" in the response when it is None, so we need to add it
if "content" not in response_json["choices"][0].get("message"):
response_json["choices"][0]["message"]["content"] = None
response = ChatCompletionResponse(**response_json) # convert to 'dot-dict' style which is the openai python client default
return response
def azure_openai_embeddings_request(
resource_name: str, deployment_id: str, api_version: str, api_key: str, data: dict
) -> EmbeddingResponse:
"""https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#embeddings"""
url = f"https://{resource_name}.openai.azure.com/openai/deployments/{deployment_id}/embeddings?api-version={api_version}"
headers = {"Content-Type": "application/json", "api-key": f"{api_key}"}
response_json = make_post_request(url, headers, data)
return EmbeddingResponse(**response_json)

View File

@@ -0,0 +1,10 @@
AZURE_MODEL_TO_CONTEXT_LENGTH = {
"babbage-002": 16384,
"davinci-002": 16384,
"gpt-35-turbo-0613": 4096,
"gpt-35-turbo-1106": 16385,
"gpt-35-turbo-0125": 16385,
"gpt-4-0613": 8192,
"gpt-4o-mini-2024-07-18": 128000,
"gpt-4o-2024-08-06": 128000,
}

396
letta/llm_api/cohere.py Normal file
View File

@@ -0,0 +1,396 @@
import json
import uuid
from typing import List, Optional, Union
import requests
from letta.local_llm.utils import count_tokens
from letta.schemas.message import Message
from letta.schemas.openai.chat_completion_request import ChatCompletionRequest, Tool
from letta.schemas.openai.chat_completion_response import (
ChatCompletionResponse,
Choice,
FunctionCall,
)
from letta.schemas.openai.chat_completion_response import (
Message as ChoiceMessage, # NOTE: avoid conflict with our own Letta Message datatype
)
from letta.schemas.openai.chat_completion_response import ToolCall, UsageStatistics
from letta.utils import get_tool_call_id, get_utc_time, json_dumps, smart_urljoin
BASE_URL = "https://api.cohere.ai/v1"
# models that we know will work with Letta
COHERE_VALID_MODEL_LIST = [
"command-r-plus",
]
def cohere_get_model_details(url: str, api_key: Union[str, None], model: str) -> int:
"""https://docs.cohere.com/reference/get-model"""
from letta.utils import printd
url = smart_urljoin(url, "models")
url = smart_urljoin(url, model)
headers = {
"accept": "application/json",
"authorization": f"bearer {api_key}",
}
printd(f"Sending request to {url}")
try:
response = requests.get(url, headers=headers)
printd(f"response = {response}")
response.raise_for_status() # Raises HTTPError for 4XX/5XX status
response = response.json() # convert to dict from string
return response
except requests.exceptions.HTTPError as http_err:
# Handle HTTP errors (e.g., response 4XX, 5XX)
printd(f"Got HTTPError, exception={http_err}")
raise http_err
except requests.exceptions.RequestException as req_err:
# Handle other requests-related errors (e.g., connection error)
printd(f"Got RequestException, exception={req_err}")
raise req_err
except Exception as e:
# Handle other potential errors
printd(f"Got unknown Exception, exception={e}")
raise e
def cohere_get_model_context_window(url: str, api_key: Union[str, None], model: str) -> int:
model_details = cohere_get_model_details(url=url, api_key=api_key, model=model)
return model_details["context_length"]
def cohere_get_model_list(url: str, api_key: Union[str, None]) -> dict:
"""https://docs.cohere.com/reference/list-models"""
from letta.utils import printd
url = smart_urljoin(url, "models")
headers = {
"accept": "application/json",
"authorization": f"bearer {api_key}",
}
printd(f"Sending request to {url}")
try:
response = requests.get(url, headers=headers)
printd(f"response = {response}")
response.raise_for_status() # Raises HTTPError for 4XX/5XX status
response = response.json() # convert to dict from string
return response["models"]
except requests.exceptions.HTTPError as http_err:
# Handle HTTP errors (e.g., response 4XX, 5XX)
printd(f"Got HTTPError, exception={http_err}")
raise http_err
except requests.exceptions.RequestException as req_err:
# Handle other requests-related errors (e.g., connection error)
printd(f"Got RequestException, exception={req_err}")
raise req_err
except Exception as e:
# Handle other potential errors
printd(f"Got unknown Exception, exception={e}")
raise e
def remap_finish_reason(finish_reason: str) -> str:
"""Remap Cohere's 'finish_reason' to OpenAI 'finish_reason'
OpenAI: 'stop', 'length', 'function_call', 'content_filter', null
see: https://platform.openai.com/docs/guides/text-generation/chat-completions-api
Cohere finish_reason is different but undocumented ???
"""
if finish_reason == "COMPLETE":
return "stop"
elif finish_reason == "MAX_TOKENS":
return "length"
# elif stop_reason == "tool_use":
# return "function_call"
else:
raise ValueError(f"Unexpected stop_reason: {finish_reason}")
def convert_cohere_response_to_chatcompletion(
response_json: dict, # REST response from API
model: str, # Required since not returned
inner_thoughts_in_kwargs: Optional[bool] = True,
) -> ChatCompletionResponse:
"""
Example response from command-r-plus:
response.json = {
'response_id': '28c47751-acce-41cd-8c89-c48a15ac33cf',
'text': '',
'generation_id': '84209c9e-2868-4984-82c5-063b748b7776',
'chat_history': [
{
'role': 'CHATBOT',
'message': 'Bootup sequence complete. Persona activated. Testing messaging functionality.'
},
{
'role': 'SYSTEM',
'message': '{"status": "OK", "message": null, "time": "2024-04-11 11:22:36 PM PDT-0700"}'
}
],
'finish_reason': 'COMPLETE',
'meta': {
'api_version': {'version': '1'},
'billed_units': {'input_tokens': 692, 'output_tokens': 20},
'tokens': {'output_tokens': 20}
},
'tool_calls': [
{
'name': 'send_message',
'parameters': {
'message': "Hello Chad, it's Sam. How are you feeling today?"
}
}
]
}
"""
if "billed_units" in response_json["meta"]:
prompt_tokens = response_json["meta"]["billed_units"]["input_tokens"]
completion_tokens = response_json["meta"]["billed_units"]["output_tokens"]
else:
# For some reason input_tokens not included in 'meta' 'tokens' dict?
prompt_tokens = count_tokens(json_dumps(response_json["chat_history"])) # NOTE: this is a very rough approximation
completion_tokens = response_json["meta"]["tokens"]["output_tokens"]
finish_reason = remap_finish_reason(response_json["finish_reason"])
if "tool_calls" in response_json and response_json["tool_calls"] is not None:
inner_thoughts = []
tool_calls = []
for tool_call_response in response_json["tool_calls"]:
function_name = tool_call_response["name"]
function_args = tool_call_response["parameters"]
if inner_thoughts_in_kwargs:
from letta.local_llm.constants import INNER_THOUGHTS_KWARG
assert INNER_THOUGHTS_KWARG in function_args
# NOTE:
inner_thoughts.append(function_args.pop(INNER_THOUGHTS_KWARG))
tool_calls.append(
ToolCall(
id=get_tool_call_id(),
type="function",
function=FunctionCall(
name=function_name,
arguments=json.dumps(function_args),
),
)
)
# NOTE: no multi-call support for now
assert len(tool_calls) == 1, tool_calls
content = inner_thoughts[0]
else:
# raise NotImplementedError(f"Expected a tool call response from Cohere API")
content = response_json["text"]
tool_calls = None
# In Cohere API empty string == null
content = None if content == "" else content
assert content is not None or tool_calls is not None, "Response message must have either content or tool_calls"
choice = Choice(
index=0,
finish_reason=finish_reason,
message=ChoiceMessage(
role="assistant",
content=content,
tool_calls=tool_calls,
),
)
return ChatCompletionResponse(
id=response_json["response_id"],
choices=[choice],
created=get_utc_time(),
model=model,
usage=UsageStatistics(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
),
)
def convert_tools_to_cohere_format(tools: List[Tool], inner_thoughts_in_kwargs: Optional[bool] = True) -> List[dict]:
"""See: https://docs.cohere.com/reference/chat
OpenAI style:
"tools": [{
"type": "function",
"function": {
"name": "find_movies",
"description": "find ....",
"parameters": {
"type": "object",
"properties": {
PARAM: {
"type": PARAM_TYPE, # eg "string"
"description": PARAM_DESCRIPTION,
},
...
},
"required": List[str],
}
}
}]
Cohere style:
"tools": [{
"name": "find_movies",
"description": "find ....",
"parameter_definitions": {
PARAM_NAME: {
"description": PARAM_DESCRIPTION,
"type": PARAM_TYPE, # eg "string"
"required": <boolean>,
}
},
}
}]
"""
tools_dict_list = []
for tool in tools:
tools_dict_list.append(
{
"name": tool.function.name,
"description": tool.function.description,
"parameter_definitions": {
p_name: {
"description": p_fields["description"],
"type": p_fields["type"],
"required": p_name in tool.function.parameters["required"],
}
for p_name, p_fields in tool.function.parameters["properties"].items()
},
}
)
if inner_thoughts_in_kwargs:
# NOTE: since Cohere doesn't allow "text" in the response when a tool call happens, if we want
# a simultaneous CoT + tool call we need to put it inside a kwarg
from letta.local_llm.constants import (
INNER_THOUGHTS_KWARG,
INNER_THOUGHTS_KWARG_DESCRIPTION,
)
for cohere_tool in tools_dict_list:
cohere_tool["parameter_definitions"][INNER_THOUGHTS_KWARG] = {
"description": INNER_THOUGHTS_KWARG_DESCRIPTION,
"type": "string",
"required": True,
}
return tools_dict_list
def cohere_chat_completions_request(
url: str,
api_key: str,
chat_completion_request: ChatCompletionRequest,
) -> ChatCompletionResponse:
"""https://docs.cohere.com/docs/multi-step-tool-use"""
from letta.utils import printd
url = smart_urljoin(url, "chat")
headers = {
"Content-Type": "application/json",
"Authorization": f"bearer {api_key}",
}
# convert the tools
cohere_tools = None if chat_completion_request.tools is None else convert_tools_to_cohere_format(chat_completion_request.tools)
# pydantic -> dict
data = chat_completion_request.model_dump(exclude_none=True)
if "functions" in data:
raise ValueError(f"'functions' unexpected in Anthropic API payload")
# If tools == None, strip from the payload
if "tools" in data and data["tools"] is None:
data.pop("tools")
data.pop("tool_choice", None) # extra safe, should exist always (default="auto")
# Convert messages to Cohere format
msg_objs = [Message.dict_to_message(user_id=uuid.uuid4(), agent_id=uuid.uuid4(), openai_message_dict=m) for m in data["messages"]]
# System message 0 should instead be a "preamble"
# See: https://docs.cohere.com/reference/chat
# The chat_history parameter should not be used for SYSTEM messages in most cases. Instead, to add a SYSTEM role message at the beginning of a conversation, the preamble parameter should be used.
assert msg_objs[0].role == "system", msg_objs[0]
preamble = msg_objs[0].text
# data["messages"] = [m.to_cohere_dict() for m in msg_objs[1:]]
data["messages"] = []
for m in msg_objs[1:]:
ms = m.to_cohere_dict() # NOTE: returns List[dict]
data["messages"].extend(ms)
assert data["messages"][-1]["role"] == "USER", data["messages"][-1]
data = {
"preamble": preamble,
"chat_history": data["messages"][:-1],
"message": data["messages"][-1]["message"],
"tools": cohere_tools,
}
# Move 'system' to the top level
# 'messages: Unexpected role "system". The Messages API accepts a top-level `system` parameter, not "system" as an input message role.'
# assert data["messages"][0]["role"] == "system", f"Expected 'system' role in messages[0]:\n{data['messages'][0]}"
# data["system"] = data["messages"][0]["content"]
# data["messages"] = data["messages"][1:]
# Convert to Anthropic format
# msg_objs = [Message.dict_to_message(user_id=uuid.uuid4(), agent_id=uuid.uuid4(), openai_message_dict=m) for m in data["messages"]]
# data["messages"] = [m.to_anthropic_dict(inner_thoughts_xml_tag=inner_thoughts_xml_tag) for m in msg_objs]
# Handling Anthropic special requirement for 'user' message in front
# messages: first message must use the "user" role'
# if data["messages"][0]["role"] != "user":
# data["messages"] = [{"role": "user", "content": DUMMY_FIRST_USER_MESSAGE}] + data["messages"]
# Handle Anthropic's restriction on alternating user/assistant messages
# data["messages"] = merge_tool_results_into_user_messages(data["messages"])
# Anthropic also wants max_tokens in the input
# It's also part of ChatCompletions
# assert "max_tokens" in data, data
# Remove extra fields used by OpenAI but not Anthropic
# data.pop("frequency_penalty", None)
# data.pop("logprobs", None)
# data.pop("n", None)
# data.pop("top_p", None)
# data.pop("presence_penalty", None)
# data.pop("user", None)
# data.pop("tool_choice", None)
printd(f"Sending request to {url}")
try:
response = requests.post(url, headers=headers, json=data)
printd(f"response = {response}")
response.raise_for_status() # Raises HTTPError for 4XX/5XX status
response = response.json() # convert to dict from string
printd(f"response.json = {response}")
response = convert_cohere_response_to_chatcompletion(response_json=response, model=chat_completion_request.model)
return response
except requests.exceptions.HTTPError as http_err:
# Handle HTTP errors (e.g., response 4XX, 5XX)
printd(f"Got HTTPError, exception={http_err}, payload={data}")
raise http_err
except requests.exceptions.RequestException as req_err:
# Handle other requests-related errors (e.g., connection error)
printd(f"Got RequestException, exception={req_err}")
raise req_err
except Exception as e:
# Handle other potential errors
printd(f"Got unknown Exception, exception={e}")
raise e

441
letta/llm_api/google_ai.py Normal file
View File

@@ -0,0 +1,441 @@
import uuid
from typing import List, Optional, Tuple
import requests
from letta.constants import NON_USER_MSG_PREFIX
from letta.llm_api.helpers import make_post_request
from letta.local_llm.json_parser import clean_json_string_extra_backslash
from letta.local_llm.utils import count_tokens
from letta.schemas.openai.chat_completion_request import Tool
from letta.schemas.openai.chat_completion_response import (
ChatCompletionResponse,
Choice,
FunctionCall,
Message,
ToolCall,
UsageStatistics,
)
from letta.utils import get_tool_call_id, get_utc_time, json_dumps
def get_gemini_endpoint_and_headers(
base_url: str, model: Optional[str], api_key: str, key_in_header: bool = True, generate_content: bool = False
) -> Tuple[str, dict]:
"""
Dynamically generate the model endpoint and headers.
"""
url = f"{base_url}/v1beta/models"
# Add the model
if model is not None:
url += f"/{model}"
# Add extension for generating content if we're hitting the LM
if generate_content:
url += ":generateContent"
# Decide if api key should be in header or not
# Two ways to pass the key: https://ai.google.dev/tutorials/setup
if key_in_header:
headers = {"Content-Type": "application/json", "x-goog-api-key": api_key}
else:
url += f"?key={api_key}"
headers = {"Content-Type": "application/json"}
return url, headers
def google_ai_get_model_details(base_url: str, api_key: str, model: str, key_in_header: bool = True) -> List[dict]:
from letta.utils import printd
url, headers = get_gemini_endpoint_and_headers(base_url, model, api_key, key_in_header)
try:
response = requests.get(url, headers=headers)
printd(f"response = {response}")
response.raise_for_status() # Raises HTTPError for 4XX/5XX status
response = response.json() # convert to dict from string
printd(f"response.json = {response}")
# Grab the models out
return response
except requests.exceptions.HTTPError as http_err:
# Handle HTTP errors (e.g., response 4XX, 5XX)
printd(f"Got HTTPError, exception={http_err}")
# Print the HTTP status code
print(f"HTTP Error: {http_err.response.status_code}")
# Print the response content (error message from server)
print(f"Message: {http_err.response.text}")
raise http_err
except requests.exceptions.RequestException as req_err:
# Handle other requests-related errors (e.g., connection error)
printd(f"Got RequestException, exception={req_err}")
raise req_err
except Exception as e:
# Handle other potential errors
printd(f"Got unknown Exception, exception={e}")
raise e
def google_ai_get_model_context_window(base_url: str, api_key: str, model: str, key_in_header: bool = True) -> int:
model_details = google_ai_get_model_details(base_url=base_url, api_key=api_key, model=model, key_in_header=key_in_header)
# TODO should this be:
# return model_details["inputTokenLimit"] + model_details["outputTokenLimit"]
return int(model_details["inputTokenLimit"])
def google_ai_get_model_list(base_url: str, api_key: str, key_in_header: bool = True) -> List[dict]:
from letta.utils import printd
url, headers = get_gemini_endpoint_and_headers(base_url, None, api_key, key_in_header)
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # Raises HTTPError for 4XX/5XX status
response = response.json() # convert to dict from string
# Grab the models out
model_list = response["models"]
return model_list
except requests.exceptions.HTTPError as http_err:
# Handle HTTP errors (e.g., response 4XX, 5XX)
printd(f"Got HTTPError, exception={http_err}")
# Print the HTTP status code
print(f"HTTP Error: {http_err.response.status_code}")
# Print the response content (error message from server)
print(f"Message: {http_err.response.text}")
raise http_err
except requests.exceptions.RequestException as req_err:
# Handle other requests-related errors (e.g., connection error)
printd(f"Got RequestException, exception={req_err}")
raise req_err
except Exception as e:
# Handle other potential errors
printd(f"Got unknown Exception, exception={e}")
raise e
def add_dummy_model_messages(messages: List[dict]) -> List[dict]:
"""Google AI API requires all function call returns are immediately followed by a 'model' role message.
In Letta, the 'model' will often call a function (e.g. send_message) that itself yields to the user,
so there is no natural follow-up 'model' role message.
To satisfy the Google AI API restrictions, we can add a dummy 'yield' message
with role == 'model' that is placed in-betweeen and function output
(role == 'tool') and user message (role == 'user').
"""
dummy_yield_message = {"role": "model", "parts": [{"text": f"{NON_USER_MSG_PREFIX}Function call returned, waiting for user response."}]}
messages_with_padding = []
for i, message in enumerate(messages):
messages_with_padding.append(message)
# Check if the current message role is 'tool' and the next message role is 'user'
if message["role"] in ["tool", "function"] and (i + 1 < len(messages) and messages[i + 1]["role"] == "user"):
messages_with_padding.append(dummy_yield_message)
return messages_with_padding
# TODO use pydantic model as input
def to_google_ai(openai_message_dict: dict) -> dict:
# TODO supports "parts" as part of multimodal support
assert not isinstance(openai_message_dict["content"], list), "Multi-part content is message not yet supported"
if openai_message_dict["role"] == "user":
google_ai_message_dict = {
"role": "user",
"parts": [{"text": openai_message_dict["content"]}],
}
elif openai_message_dict["role"] == "assistant":
google_ai_message_dict = {
"role": "model", # NOTE: diff
"parts": [{"text": openai_message_dict["content"]}],
}
elif openai_message_dict["role"] == "tool":
google_ai_message_dict = {
"role": "function", # NOTE: diff
"parts": [{"text": openai_message_dict["content"]}],
}
else:
raise ValueError(f"Unsupported conversion (OpenAI -> Google AI) from role {openai_message_dict['role']}")
# TODO convert return type to pydantic
def convert_tools_to_google_ai_format(tools: List[Tool], inner_thoughts_in_kwargs: Optional[bool] = True) -> List[dict]:
"""
OpenAI style:
"tools": [{
"type": "function",
"function": {
"name": "find_movies",
"description": "find ....",
"parameters": {
"type": "object",
"properties": {
PARAM: {
"type": PARAM_TYPE, # eg "string"
"description": PARAM_DESCRIPTION,
},
...
},
"required": List[str],
}
}
}
]
Google AI style:
"tools": [{
"functionDeclarations": [{
"name": "find_movies",
"description": "find movie titles currently playing in theaters based on any description, genre, title words, etc.",
"parameters": {
"type": "OBJECT",
"properties": {
"location": {
"type": "STRING",
"description": "The city and state, e.g. San Francisco, CA or a zip code e.g. 95616"
},
"description": {
"type": "STRING",
"description": "Any kind of description including category or genre, title words, attributes, etc."
}
},
"required": ["description"]
}
}, {
"name": "find_theaters",
...
"""
function_list = [
dict(
name=t.function.name,
description=t.function.description,
parameters=t.function.parameters, # TODO need to unpack
)
for t in tools
]
# Correct casing + add inner thoughts if needed
for func in function_list:
func["parameters"]["type"] = "OBJECT"
for param_name, param_fields in func["parameters"]["properties"].items():
param_fields["type"] = param_fields["type"].upper()
# Add inner thoughts
if inner_thoughts_in_kwargs:
from letta.local_llm.constants import (
INNER_THOUGHTS_KWARG,
INNER_THOUGHTS_KWARG_DESCRIPTION,
)
func["parameters"]["properties"][INNER_THOUGHTS_KWARG] = {
"type": "STRING",
"description": INNER_THOUGHTS_KWARG_DESCRIPTION,
}
func["parameters"]["required"].append(INNER_THOUGHTS_KWARG)
return [{"functionDeclarations": function_list}]
def convert_google_ai_response_to_chatcompletion(
response_json: dict, # REST response from Google AI API
model: str, # Required since not returned
input_messages: Optional[List[dict]] = None, # Required if the API doesn't return UsageMetadata
pull_inner_thoughts_from_args: Optional[bool] = True,
) -> ChatCompletionResponse:
"""Google AI API response format is not the same as ChatCompletion, requires unpacking
Example:
{
"candidates": [
{
"content": {
"parts": [
{
"text": " OK. Barbie is showing in two theaters in Mountain View, CA: AMC Mountain View 16 and Regal Edwards 14."
}
]
}
}
],
"usageMetadata": {
"promptTokenCount": 9,
"candidatesTokenCount": 27,
"totalTokenCount": 36
}
}
"""
try:
choices = []
for candidate in response_json["candidates"]:
content = candidate["content"]
role = content["role"]
assert role == "model", f"Unknown role in response: {role}"
parts = content["parts"]
# TODO support parts / multimodal
assert len(parts) == 1, f"Multi-part not yet supported:\n{parts}"
response_message = parts[0]
# Convert the actual message style to OpenAI style
if "functionCall" in response_message and response_message["functionCall"] is not None:
function_call = response_message["functionCall"]
assert isinstance(function_call, dict), function_call
function_name = function_call["name"]
assert isinstance(function_name, str), function_name
function_args = function_call["args"]
assert isinstance(function_args, dict), function_args
# NOTE: this also involves stripping the inner monologue out of the function
if pull_inner_thoughts_from_args:
from letta.local_llm.constants import INNER_THOUGHTS_KWARG
assert INNER_THOUGHTS_KWARG in function_args, f"Couldn't find inner thoughts in function args:\n{function_call}"
inner_thoughts = function_args.pop(INNER_THOUGHTS_KWARG)
assert inner_thoughts is not None, f"Expected non-null inner thoughts function arg:\n{function_call}"
else:
inner_thoughts = None
# Google AI API doesn't generate tool call IDs
openai_response_message = Message(
role="assistant", # NOTE: "model" -> "assistant"
content=inner_thoughts,
tool_calls=[
ToolCall(
id=get_tool_call_id(),
type="function",
function=FunctionCall(
name=function_name,
arguments=clean_json_string_extra_backslash(json_dumps(function_args)),
),
)
],
)
else:
# Inner thoughts are the content by default
inner_thoughts = response_message["text"]
# Google AI API doesn't generate tool call IDs
openai_response_message = Message(
role="assistant", # NOTE: "model" -> "assistant"
content=inner_thoughts,
)
# Google AI API uses different finish reason strings than OpenAI
# OpenAI: 'stop', 'length', 'function_call', 'content_filter', null
# see: https://platform.openai.com/docs/guides/text-generation/chat-completions-api
# Google AI API: FINISH_REASON_UNSPECIFIED, STOP, MAX_TOKENS, SAFETY, RECITATION, OTHER
# see: https://ai.google.dev/api/python/google/ai/generativelanguage/Candidate/FinishReason
finish_reason = candidate["finishReason"]
if finish_reason == "STOP":
openai_finish_reason = (
"function_call"
if openai_response_message.tool_calls is not None and len(openai_response_message.tool_calls) > 0
else "stop"
)
elif finish_reason == "MAX_TOKENS":
openai_finish_reason = "length"
elif finish_reason == "SAFETY":
openai_finish_reason = "content_filter"
elif finish_reason == "RECITATION":
openai_finish_reason = "content_filter"
else:
raise ValueError(f"Unrecognized finish reason in Google AI response: {finish_reason}")
choices.append(
Choice(
finish_reason=openai_finish_reason,
index=candidate["index"],
message=openai_response_message,
)
)
if len(choices) > 1:
raise UserWarning(f"Unexpected number of candidates in response (expected 1, got {len(choices)})")
# NOTE: some of the Google AI APIs show UsageMetadata in the response, but it seems to not exist?
# "usageMetadata": {
# "promptTokenCount": 9,
# "candidatesTokenCount": 27,
# "totalTokenCount": 36
# }
if "usageMetadata" in response_json:
usage = UsageStatistics(
prompt_tokens=response_json["usageMetadata"]["promptTokenCount"],
completion_tokens=response_json["usageMetadata"]["candidatesTokenCount"],
total_tokens=response_json["usageMetadata"]["totalTokenCount"],
)
else:
# Count it ourselves
assert input_messages is not None, f"Didn't get UsageMetadata from the API response, so input_messages is required"
prompt_tokens = count_tokens(json_dumps(input_messages)) # NOTE: this is a very rough approximation
completion_tokens = count_tokens(json_dumps(openai_response_message.model_dump())) # NOTE: this is also approximate
total_tokens = prompt_tokens + completion_tokens
usage = UsageStatistics(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)
response_id = str(uuid.uuid4())
return ChatCompletionResponse(
id=response_id,
choices=choices,
model=model, # NOTE: Google API doesn't pass back model in the response
created=get_utc_time(),
usage=usage,
)
except KeyError as e:
raise e
# TODO convert 'data' type to pydantic
def google_ai_chat_completions_request(
base_url: str,
model: str,
api_key: str,
data: dict,
key_in_header: bool = True,
add_postfunc_model_messages: bool = True,
# NOTE: Google AI API doesn't support mixing parts 'text' and 'function',
# so there's no clean way to put inner thoughts in the same message as a function call
inner_thoughts_in_kwargs: bool = True,
) -> ChatCompletionResponse:
"""https://ai.google.dev/docs/function_calling
From https://ai.google.dev/api/rest#service-endpoint:
"A service endpoint is a base URL that specifies the network address of an API service.
One service might have multiple service endpoints.
This service has the following service endpoint and all URIs below are relative to this service endpoint:
https://xxx.googleapis.com
"""
assert api_key is not None, "Missing api_key when calling Google AI"
url, headers = get_gemini_endpoint_and_headers(base_url, model, api_key, key_in_header, generate_content=True)
# data["contents"][-1]["role"] = "model"
if add_postfunc_model_messages:
data["contents"] = add_dummy_model_messages(data["contents"])
response_json = make_post_request(url, headers, data)
try:
return convert_google_ai_response_to_chatcompletion(
response_json=response_json,
model=data.get("model"),
input_messages=data["contents"],
pull_inner_thoughts_from_args=inner_thoughts_in_kwargs,
)
except Exception as conversion_error:
print(f"Error during response conversion: {conversion_error}")
raise conversion_error

323
letta/llm_api/helpers.py Normal file
View File

@@ -0,0 +1,323 @@
import copy
import json
import warnings
from collections import OrderedDict
from typing import Any, List, Union
import requests
from letta.constants import OPENAI_CONTEXT_WINDOW_ERROR_SUBSTRING
from letta.schemas.openai.chat_completion_response import ChatCompletionResponse, Choice
from letta.utils import json_dumps, printd
def _convert_to_structured_output_helper(property: dict) -> dict:
"""Convert a single JSON schema property to structured output format (recursive)"""
if "type" not in property:
raise ValueError(f"Property {property} is missing a type")
param_type = property["type"]
if "description" not in property:
# raise ValueError(f"Property {property} is missing a description")
param_description = None
else:
param_description = property["description"]
if param_type == "object":
if "properties" not in property:
raise ValueError(f"Property {property} of type object is missing properties")
properties = property["properties"]
property_dict = {
"type": "object",
"properties": {k: _convert_to_structured_output_helper(v) for k, v in properties.items()},
"additionalProperties": False,
"required": list(properties.keys()),
}
if param_description is not None:
property_dict["description"] = param_description
return property_dict
elif param_type == "array":
if "items" not in property:
raise ValueError(f"Property {property} of type array is missing items")
items = property["items"]
property_dict = {
"type": "array",
"items": _convert_to_structured_output_helper(items),
}
if param_description is not None:
property_dict["description"] = param_description
return property_dict
else:
property_dict = {
"type": param_type, # simple type
}
if param_description is not None:
property_dict["description"] = param_description
return property_dict
def convert_to_structured_output(openai_function: dict, allow_optional: bool = False) -> dict:
"""Convert function call objects to structured output objects
See: https://platform.openai.com/docs/guides/structured-outputs/supported-schemas
"""
description = openai_function["description"] if "description" in openai_function else ""
structured_output = {
"name": openai_function["name"],
"description": description,
"strict": True,
"parameters": {
"type": "object",
"properties": {},
"additionalProperties": False,
"required": [],
},
}
# This code needs to be able to handle nested properties
# For example, the param details may have "type" + "description",
# but if "type" is "object" we expected "properties", where each property has details
# and if "type" is "array" we expect "items": <type>
for param, details in openai_function["parameters"]["properties"].items():
param_type = details["type"]
description = details["description"]
if param_type == "object":
if "properties" not in details:
# Structured outputs requires the properties on dicts be specified ahead of time
raise ValueError(f"Property {param} of type object is missing properties")
structured_output["parameters"]["properties"][param] = {
"type": "object",
"description": description,
"properties": {k: _convert_to_structured_output_helper(v) for k, v in details["properties"].items()},
"additionalProperties": False,
"required": list(details["properties"].keys()),
}
elif param_type == "array":
structured_output["parameters"]["properties"][param] = {
"type": "array",
"description": description,
"items": _convert_to_structured_output_helper(details["items"]),
}
else:
structured_output["parameters"]["properties"][param] = {
"type": param_type, # simple type
"description": description,
}
if "enum" in details:
structured_output["parameters"]["properties"][param]["enum"] = details["enum"]
if not allow_optional:
# Add all properties to required list
structured_output["parameters"]["required"] = list(structured_output["parameters"]["properties"].keys())
else:
# See what parameters exist that aren't required
# Those are implied "optional" types
# For those types, turn each of them into a union type with "null"
# e.g.
# "type": "string" -> "type": ["string", "null"]
# TODO
raise NotImplementedError
return structured_output
def make_post_request(url: str, headers: dict[str, str], data: dict[str, Any]) -> dict[str, Any]:
printd(f"Sending request to {url}")
try:
# Make the POST request
response = requests.post(url, headers=headers, json=data)
printd(f"Response status code: {response.status_code}")
# Raise for 4XX/5XX HTTP errors
response.raise_for_status()
# Check if the response content type indicates JSON and attempt to parse it
content_type = response.headers.get("Content-Type", "")
if "application/json" in content_type.lower():
try:
response_data = response.json() # Attempt to parse the response as JSON
printd(f"Response JSON: {response_data}")
except ValueError as json_err:
# Handle the case where the content type says JSON but the body is invalid
error_message = f"Failed to parse JSON despite Content-Type being {content_type}: {json_err}"
printd(error_message)
raise ValueError(error_message) from json_err
else:
error_message = f"Unexpected content type returned: {response.headers.get('Content-Type')}"
printd(error_message)
raise ValueError(error_message)
# Process the response using the callback function
return response_data
except requests.exceptions.HTTPError as http_err:
# HTTP errors (4XX, 5XX)
error_message = f"HTTP error occurred: {http_err}"
if http_err.response is not None:
error_message += f" | Status code: {http_err.response.status_code}, Message: {http_err.response.text}"
printd(error_message)
raise requests.exceptions.HTTPError(error_message) from http_err
except requests.exceptions.Timeout as timeout_err:
# Handle timeout errors
error_message = f"Request timed out: {timeout_err}"
printd(error_message)
raise requests.exceptions.Timeout(error_message) from timeout_err
except requests.exceptions.RequestException as req_err:
# Non-HTTP errors (e.g., connection, SSL errors)
error_message = f"Request failed: {req_err}"
printd(error_message)
raise requests.exceptions.RequestException(error_message) from req_err
except ValueError as val_err:
# Handle content-type or non-JSON response issues
error_message = f"ValueError: {val_err}"
printd(error_message)
raise ValueError(error_message) from val_err
except Exception as e:
# Catch any other unknown exceptions
error_message = f"An unexpected error occurred: {e}"
printd(error_message)
raise Exception(error_message) from e
# TODO update to use better types
def add_inner_thoughts_to_functions(
functions: List[dict],
inner_thoughts_key: str,
inner_thoughts_description: str,
inner_thoughts_required: bool = True,
) -> List[dict]:
"""Add an inner_thoughts kwarg to every function in the provided list, ensuring it's the first parameter"""
new_functions = []
for function_object in functions:
new_function_object = copy.deepcopy(function_object)
# Create a new OrderedDict with inner_thoughts as the first item
new_properties = OrderedDict()
new_properties[inner_thoughts_key] = {
"type": "string",
"description": inner_thoughts_description,
}
# Add the rest of the properties
new_properties.update(function_object["parameters"]["properties"])
# Cast OrderedDict back to a regular dict
new_function_object["parameters"]["properties"] = dict(new_properties)
# Update required parameters if necessary
if inner_thoughts_required:
required_params = new_function_object["parameters"].get("required", [])
if inner_thoughts_key not in required_params:
required_params.insert(0, inner_thoughts_key)
new_function_object["parameters"]["required"] = required_params
new_functions.append(new_function_object)
return new_functions
def unpack_all_inner_thoughts_from_kwargs(
response: ChatCompletionResponse,
inner_thoughts_key: str,
) -> ChatCompletionResponse:
"""Strip the inner thoughts out of the tool call and put it in the message content"""
if len(response.choices) == 0:
raise ValueError(f"Unpacking inner thoughts from empty response not supported")
new_choices = []
for choice in response.choices:
new_choices.append(unpack_inner_thoughts_from_kwargs(choice, inner_thoughts_key))
# return an updated copy
new_response = response.model_copy(deep=True)
new_response.choices = new_choices
return new_response
def unpack_inner_thoughts_from_kwargs(choice: Choice, inner_thoughts_key: str) -> Choice:
message = choice.message
if message.role == "assistant" and message.tool_calls and len(message.tool_calls) >= 1:
if len(message.tool_calls) > 1:
warnings.warn(f"Unpacking inner thoughts from more than one tool call ({len(message.tool_calls)}) is not supported")
# TODO support multiple tool calls
tool_call = message.tool_calls[0]
try:
# Sadly we need to parse the JSON since args are in string format
func_args = dict(json.loads(tool_call.function.arguments))
if inner_thoughts_key in func_args:
# extract the inner thoughts
inner_thoughts = func_args.pop(inner_thoughts_key)
# replace the kwargs
new_choice = choice.model_copy(deep=True)
new_choice.message.tool_calls[0].function.arguments = json_dumps(func_args)
# also replace the message content
if new_choice.message.content is not None:
warnings.warn(f"Overwriting existing inner monologue ({new_choice.message.content}) with kwarg ({inner_thoughts})")
new_choice.message.content = inner_thoughts
return new_choice
else:
warnings.warn(f"Did not find inner thoughts in tool call: {str(tool_call)}")
return choice
except json.JSONDecodeError as e:
warnings.warn(f"Failed to strip inner thoughts from kwargs: {e}")
raise e
def is_context_overflow_error(exception: Union[requests.exceptions.RequestException, Exception]) -> bool:
"""Checks if an exception is due to context overflow (based on common OpenAI response messages)"""
from letta.utils import printd
match_string = OPENAI_CONTEXT_WINDOW_ERROR_SUBSTRING
# Backwards compatibility with openai python package/client v0.28 (pre-v1 client migration)
if match_string in str(exception):
printd(f"Found '{match_string}' in str(exception)={(str(exception))}")
return True
# Based on python requests + OpenAI REST API (/v1)
elif isinstance(exception, requests.exceptions.HTTPError):
if exception.response is not None and "application/json" in exception.response.headers.get("Content-Type", ""):
try:
error_details = exception.response.json()
if "error" not in error_details:
printd(f"HTTPError occurred, but couldn't find error field: {error_details}")
return False
else:
error_details = error_details["error"]
# Check for the specific error code
if error_details.get("code") == "context_length_exceeded":
printd(f"HTTPError occurred, caught error code {error_details.get('code')}")
return True
# Soft-check for "maximum context length" inside of the message
elif error_details.get("message") and "maximum context length" in error_details.get("message"):
printd(f"HTTPError occurred, found '{match_string}' in error message contents ({error_details})")
return True
else:
printd(f"HTTPError occurred, but unknown error message: {error_details}")
return False
except ValueError:
# JSON decoding failed
printd(f"HTTPError occurred ({exception}), but no JSON error message.")
# Generic fail
else:
return False

View File

@@ -0,0 +1,405 @@
import random
import time
from typing import List, Optional, Union
import requests
from letta.constants import CLI_WARNING_PREFIX
from letta.errors import LettaConfigurationError, RateLimitExceededError
from letta.llm_api.anthropic import anthropic_chat_completions_request
from letta.llm_api.azure_openai import azure_openai_chat_completions_request
from letta.llm_api.google_ai import (
convert_tools_to_google_ai_format,
google_ai_chat_completions_request,
)
from letta.llm_api.helpers import (
add_inner_thoughts_to_functions,
unpack_all_inner_thoughts_from_kwargs,
)
from letta.llm_api.openai import (
build_openai_chat_completions_request,
openai_chat_completions_process_stream,
openai_chat_completions_request,
)
from letta.local_llm.chat_completion_proxy import get_chat_completion
from letta.local_llm.constants import (
INNER_THOUGHTS_KWARG,
INNER_THOUGHTS_KWARG_DESCRIPTION,
)
from letta.local_llm.utils import num_tokens_from_functions, num_tokens_from_messages
from letta.schemas.llm_config import LLMConfig
from letta.schemas.message import Message
from letta.schemas.openai.chat_completion_request import (
ChatCompletionRequest,
Tool,
cast_message_to_subtype,
)
from letta.schemas.openai.chat_completion_response import ChatCompletionResponse
from letta.settings import ModelSettings
from letta.streaming_interface import (
AgentChunkStreamingInterface,
AgentRefreshStreamingInterface,
)
LLM_API_PROVIDER_OPTIONS = ["openai", "azure", "anthropic", "google_ai", "cohere", "local", "groq"]
def retry_with_exponential_backoff(
func,
initial_delay: float = 1,
exponential_base: float = 2,
jitter: bool = True,
max_retries: int = 20,
# List of OpenAI error codes: https://github.com/openai/openai-python/blob/17ac6779958b2b74999c634c4ea4c7b74906027a/src/openai/_client.py#L227-L250
# 429 = rate limit
error_codes: tuple = (429,),
):
"""Retry a function with exponential backoff."""
def wrapper(*args, **kwargs):
pass
# Initialize variables
num_retries = 0
delay = initial_delay
# Loop until a successful response or max_retries is hit or an exception is raised
while True:
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as http_err:
if not hasattr(http_err, "response") or not http_err.response:
raise
# Retry on specified errors
if http_err.response.status_code in error_codes:
# Increment retries
num_retries += 1
# Check if max retries has been reached
if num_retries > max_retries:
raise RateLimitExceededError("Maximum number of retries exceeded", max_retries=max_retries)
# Increment the delay
delay *= exponential_base * (1 + jitter * random.random())
# Sleep for the delay
# printd(f"Got a rate limit error ('{http_err}') on LLM backend request, waiting {int(delay)}s then retrying...")
print(
f"{CLI_WARNING_PREFIX}Got a rate limit error ('{http_err}') on LLM backend request, waiting {int(delay)}s then retrying..."
)
time.sleep(delay)
else:
# For other HTTP errors, re-raise the exception
raise
# Raise exceptions for any errors not specified
except Exception as e:
raise e
return wrapper
@retry_with_exponential_backoff
def create(
# agent_state: AgentState,
llm_config: LLMConfig,
messages: List[Message],
user_id: Optional[str] = None, # option UUID to associate request with
functions: Optional[list] = None,
functions_python: Optional[dict] = None,
function_call: str = "auto",
# hint
first_message: bool = False,
force_tool_call: Optional[str] = None, # Force a specific tool to be called
# use tool naming?
# if false, will use deprecated 'functions' style
use_tool_naming: bool = True,
# streaming?
stream: bool = False,
stream_interface: Optional[Union[AgentRefreshStreamingInterface, AgentChunkStreamingInterface]] = None,
max_tokens: Optional[int] = None,
model_settings: Optional[dict] = None, # TODO: eventually pass from server
) -> ChatCompletionResponse:
"""Return response to chat completion with backoff"""
from letta.utils import printd
# Count the tokens first, if there's an overflow exit early by throwing an error up the stack
# NOTE: we want to include a specific substring in the error message to trigger summarization
messages_oai_format = [m.to_openai_dict() for m in messages]
prompt_tokens = num_tokens_from_messages(messages=messages_oai_format, model=llm_config.model)
function_tokens = num_tokens_from_functions(functions=functions, model=llm_config.model) if functions else 0
if prompt_tokens + function_tokens > llm_config.context_window:
raise Exception(f"Request exceeds maximum context length ({prompt_tokens + function_tokens} > {llm_config.context_window} tokens)")
if not model_settings:
from letta.settings import model_settings
model_settings = model_settings
assert isinstance(model_settings, ModelSettings)
printd(f"Using model {llm_config.model_endpoint_type}, endpoint: {llm_config.model_endpoint}")
if function_call and not functions:
printd("unsetting function_call because functions is None")
function_call = None
# openai
if llm_config.model_endpoint_type == "openai":
if model_settings.openai_api_key is None and llm_config.model_endpoint == "https://api.openai.com/v1":
# only is a problem if we are *not* using an openai proxy
raise LettaConfigurationError(message="OpenAI key is missing from letta config file", missing_fields=["openai_api_key"])
data = build_openai_chat_completions_request(llm_config, messages, user_id, functions, function_call, use_tool_naming, max_tokens)
if stream: # Client requested token streaming
data.stream = True
assert isinstance(stream_interface, AgentChunkStreamingInterface) or isinstance(
stream_interface, AgentRefreshStreamingInterface
), type(stream_interface)
response = openai_chat_completions_process_stream(
url=llm_config.model_endpoint, # https://api.openai.com/v1 -> https://api.openai.com/v1/chat/completions
api_key=model_settings.openai_api_key,
chat_completion_request=data,
stream_interface=stream_interface,
)
else: # Client did not request token streaming (expect a blocking backend response)
data.stream = False
if isinstance(stream_interface, AgentChunkStreamingInterface):
stream_interface.stream_start()
try:
response = openai_chat_completions_request(
url=llm_config.model_endpoint, # https://api.openai.com/v1 -> https://api.openai.com/v1/chat/completions
api_key=model_settings.openai_api_key,
chat_completion_request=data,
)
finally:
if isinstance(stream_interface, AgentChunkStreamingInterface):
stream_interface.stream_end()
if llm_config.put_inner_thoughts_in_kwargs:
response = unpack_all_inner_thoughts_from_kwargs(response=response, inner_thoughts_key=INNER_THOUGHTS_KWARG)
return response
# azure
elif llm_config.model_endpoint_type == "azure":
if stream:
raise NotImplementedError(f"Streaming not yet implemented for {llm_config.model_endpoint_type}")
if model_settings.azure_api_key is None:
raise LettaConfigurationError(
message="Azure API key is missing. Did you set AZURE_API_KEY in your env?", missing_fields=["azure_api_key"]
)
if model_settings.azure_base_url is None:
raise LettaConfigurationError(
message="Azure base url is missing. Did you set AZURE_BASE_URL in your env?", missing_fields=["azure_base_url"]
)
if model_settings.azure_api_version is None:
raise LettaConfigurationError(
message="Azure API version is missing. Did you set AZURE_API_VERSION in your env?", missing_fields=["azure_api_version"]
)
# Set the llm config model_endpoint from model_settings
# For Azure, this model_endpoint is required to be configured via env variable, so users don't need to provide it in the LLM config
llm_config.model_endpoint = model_settings.azure_base_url
chat_completion_request = build_openai_chat_completions_request(
llm_config, messages, user_id, functions, function_call, use_tool_naming, max_tokens
)
response = azure_openai_chat_completions_request(
model_settings=model_settings,
llm_config=llm_config,
api_key=model_settings.azure_api_key,
chat_completion_request=chat_completion_request,
)
if llm_config.put_inner_thoughts_in_kwargs:
response = unpack_all_inner_thoughts_from_kwargs(response=response, inner_thoughts_key=INNER_THOUGHTS_KWARG)
return response
elif llm_config.model_endpoint_type == "google_ai":
if stream:
raise NotImplementedError(f"Streaming not yet implemented for {llm_config.model_endpoint_type}")
if not use_tool_naming:
raise NotImplementedError("Only tool calling supported on Google AI API requests")
if functions is not None:
tools = [{"type": "function", "function": f} for f in functions]
tools = [Tool(**t) for t in tools]
tools = convert_tools_to_google_ai_format(tools, inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs)
else:
tools = None
return google_ai_chat_completions_request(
base_url=llm_config.model_endpoint,
model=llm_config.model,
api_key=model_settings.gemini_api_key,
# see structure of payload here: https://ai.google.dev/docs/function_calling
data=dict(
contents=[m.to_google_ai_dict() for m in messages],
tools=tools,
),
inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs,
)
elif llm_config.model_endpoint_type == "anthropic":
if stream:
raise NotImplementedError(f"Streaming not yet implemented for {llm_config.model_endpoint_type}")
if not use_tool_naming:
raise NotImplementedError("Only tool calling supported on Anthropic API requests")
tool_call = None
if force_tool_call is not None:
tool_call = {
"type": "function",
"function": {
"name": force_tool_call
}
}
assert functions is not None
return anthropic_chat_completions_request(
url=llm_config.model_endpoint,
api_key=model_settings.anthropic_api_key,
data=ChatCompletionRequest(
model=llm_config.model,
messages=[cast_message_to_subtype(m.to_openai_dict()) for m in messages],
tools=[{"type": "function", "function": f} for f in functions] if functions else None,
tool_choice=tool_call,
# user=str(user_id),
# NOTE: max_tokens is required for Anthropic API
max_tokens=1024, # TODO make dynamic
),
)
# elif llm_config.model_endpoint_type == "cohere":
# if stream:
# raise NotImplementedError(f"Streaming not yet implemented for {llm_config.model_endpoint_type}")
# if not use_tool_naming:
# raise NotImplementedError("Only tool calling supported on Cohere API requests")
#
# if functions is not None:
# tools = [{"type": "function", "function": f} for f in functions]
# tools = [Tool(**t) for t in tools]
# else:
# tools = None
#
# return cohere_chat_completions_request(
# # url=llm_config.model_endpoint,
# url="https://api.cohere.ai/v1", # TODO
# api_key=os.getenv("COHERE_API_KEY"), # TODO remove
# chat_completion_request=ChatCompletionRequest(
# model="command-r-plus", # TODO
# messages=[cast_message_to_subtype(m.to_openai_dict()) for m in messages],
# tools=tools,
# tool_choice=function_call,
# # user=str(user_id),
# # NOTE: max_tokens is required for Anthropic API
# # max_tokens=1024, # TODO make dynamic
# ),
# )
elif llm_config.model_endpoint_type == "groq":
if stream:
raise NotImplementedError(f"Streaming not yet implemented for Groq.")
if model_settings.groq_api_key is None and llm_config.model_endpoint == "https://api.groq.com/openai/v1/chat/completions":
raise LettaConfigurationError(message="Groq key is missing from letta config file", missing_fields=["groq_api_key"])
# force to true for groq, since they don't support 'content' is non-null
if llm_config.put_inner_thoughts_in_kwargs:
functions = add_inner_thoughts_to_functions(
functions=functions,
inner_thoughts_key=INNER_THOUGHTS_KWARG,
inner_thoughts_description=INNER_THOUGHTS_KWARG_DESCRIPTION,
)
tools = [{"type": "function", "function": f} for f in functions] if functions is not None else None
data = ChatCompletionRequest(
model=llm_config.model,
messages=[m.to_openai_dict(put_inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs) for m in messages],
tools=tools,
tool_choice=function_call,
user=str(user_id),
)
# https://console.groq.com/docs/openai
# "The following fields are currently not supported and will result in a 400 error (yikes) if they are supplied:"
assert data.top_logprobs is None
assert data.logit_bias is None
assert data.logprobs == False
assert data.n == 1
# They mention that none of the messages can have names, but it seems to not error out (for now)
data.stream = False
if isinstance(stream_interface, AgentChunkStreamingInterface):
stream_interface.stream_start()
try:
# groq uses the openai chat completions API, so this component should be reusable
response = openai_chat_completions_request(
url=llm_config.model_endpoint,
api_key=model_settings.groq_api_key,
chat_completion_request=data,
)
finally:
if isinstance(stream_interface, AgentChunkStreamingInterface):
stream_interface.stream_end()
if llm_config.put_inner_thoughts_in_kwargs:
response = unpack_all_inner_thoughts_from_kwargs(response=response, inner_thoughts_key=INNER_THOUGHTS_KWARG)
return response
elif llm_config.model_endpoint_type == "together":
"""TogetherAI endpoint that goes via /completions instead of /chat/completions"""
if stream:
raise NotImplementedError(f"Streaming not yet implemented for TogetherAI (via the /completions endpoint).")
if model_settings.together_api_key is None and llm_config.model_endpoint == "https://api.together.ai/v1/completions":
raise LettaConfigurationError(message="TogetherAI key is missing from letta config file", missing_fields=["together_api_key"])
return get_chat_completion(
model=llm_config.model,
messages=messages,
functions=functions,
functions_python=functions_python,
function_call=function_call,
context_window=llm_config.context_window,
endpoint=llm_config.model_endpoint,
endpoint_type="vllm", # NOTE: use the vLLM path through /completions
wrapper=llm_config.model_wrapper,
user=str(user_id),
# hint
first_message=first_message,
# auth-related
auth_type="bearer_token", # NOTE: Together expects bearer token auth
auth_key=model_settings.together_api_key,
)
# local model
else:
if stream:
raise NotImplementedError(f"Streaming not yet implemented for {llm_config.model_endpoint_type}")
return get_chat_completion(
model=llm_config.model,
messages=messages,
functions=functions,
functions_python=functions_python,
function_call=function_call,
context_window=llm_config.context_window,
endpoint=llm_config.model_endpoint,
endpoint_type=llm_config.model_endpoint_type,
wrapper=llm_config.model_wrapper,
user=str(user_id),
# hint
first_message=first_message,
# auth-related
auth_type=model_settings.openllm_auth_type,
auth_key=model_settings.openllm_api_key,
)

47
letta/llm_api/mistral.py Normal file
View File

@@ -0,0 +1,47 @@
import requests
from letta.utils import printd, smart_urljoin
def mistral_get_model_list(url: str, api_key: str) -> dict:
url = smart_urljoin(url, "models")
headers = {"Content-Type": "application/json"}
if api_key is not None:
headers["Authorization"] = f"Bearer {api_key}"
printd(f"Sending request to {url}")
response = None
try:
# TODO add query param "tool" to be true
response = requests.get(url, headers=headers)
response.raise_for_status() # Raises HTTPError for 4XX/5XX status
response_json = response.json() # convert to dict from string
return response_json
except requests.exceptions.HTTPError as http_err:
# Handle HTTP errors (e.g., response 4XX, 5XX)
try:
if response:
response = response.json()
except:
pass
printd(f"Got HTTPError, exception={http_err}, response={response}")
raise http_err
except requests.exceptions.RequestException as req_err:
# Handle other requests-related errors (e.g., connection error)
try:
if response:
response = response.json()
except:
pass
printd(f"Got RequestException, exception={req_err}, response={response}")
raise req_err
except Exception as e:
# Handle other potential errors
try:
if response:
response = response.json()
except:
pass
printd(f"Got unknown Exception, exception={e}, response={response}")
raise e

553
letta/llm_api/openai.py Normal file
View File

@@ -0,0 +1,553 @@
import json
import warnings
from typing import Generator, List, Optional, Union
import httpx
import requests
from httpx_sse import connect_sse
from httpx_sse._exceptions import SSEError
from letta.constants import OPENAI_CONTEXT_WINDOW_ERROR_SUBSTRING
from letta.errors import LLMError
from letta.llm_api.helpers import (
add_inner_thoughts_to_functions,
convert_to_structured_output,
make_post_request,
)
from letta.local_llm.constants import (
INNER_THOUGHTS_KWARG,
INNER_THOUGHTS_KWARG_DESCRIPTION,
)
from letta.local_llm.utils import num_tokens_from_functions, num_tokens_from_messages
from letta.schemas.llm_config import LLMConfig
from letta.schemas.message import Message as _Message
from letta.schemas.message import MessageRole as _MessageRole
from letta.schemas.openai.chat_completion_request import ChatCompletionRequest
from letta.schemas.openai.chat_completion_request import (
FunctionCall as ToolFunctionChoiceFunctionCall,
)
from letta.schemas.openai.chat_completion_request import (
Tool,
ToolFunctionChoice,
cast_message_to_subtype,
)
from letta.schemas.openai.chat_completion_response import (
ChatCompletionChunkResponse,
ChatCompletionResponse,
Choice,
FunctionCall,
Message,
ToolCall,
UsageStatistics,
)
from letta.schemas.openai.embedding_response import EmbeddingResponse
from letta.streaming_interface import (
AgentChunkStreamingInterface,
AgentRefreshStreamingInterface,
)
from letta.utils import get_tool_call_id, smart_urljoin
OPENAI_SSE_DONE = "[DONE]"
def openai_get_model_list(
url: str, api_key: Union[str, None], fix_url: Optional[bool] = False, extra_params: Optional[dict] = None
) -> dict:
"""https://platform.openai.com/docs/api-reference/models/list"""
from letta.utils import printd
# In some cases we may want to double-check the URL and do basic correction, eg:
# In Letta config the address for vLLM is w/o a /v1 suffix for simplicity
# However if we're treating the server as an OpenAI proxy we want the /v1 suffix on our model hit
if fix_url:
if not url.endswith("/v1"):
url = smart_urljoin(url, "v1")
url = smart_urljoin(url, "models")
headers = {"Content-Type": "application/json"}
if api_key is not None:
headers["Authorization"] = f"Bearer {api_key}"
printd(f"Sending request to {url}")
response = None
try:
# TODO add query param "tool" to be true
response = requests.get(url, headers=headers, params=extra_params)
response.raise_for_status() # Raises HTTPError for 4XX/5XX status
response = response.json() # convert to dict from string
printd(f"response = {response}")
return response
except requests.exceptions.HTTPError as http_err:
# Handle HTTP errors (e.g., response 4XX, 5XX)
try:
if response:
response = response.json()
except:
pass
printd(f"Got HTTPError, exception={http_err}, response={response}")
raise http_err
except requests.exceptions.RequestException as req_err:
# Handle other requests-related errors (e.g., connection error)
try:
if response:
response = response.json()
except:
pass
printd(f"Got RequestException, exception={req_err}, response={response}")
raise req_err
except Exception as e:
# Handle other potential errors
try:
if response:
response = response.json()
except:
pass
printd(f"Got unknown Exception, exception={e}, response={response}")
raise e
def build_openai_chat_completions_request(
llm_config: LLMConfig,
messages: List[_Message],
user_id: Optional[str],
functions: Optional[list],
function_call: Optional[str],
use_tool_naming: bool,
max_tokens: Optional[int],
) -> ChatCompletionRequest:
if functions and llm_config.put_inner_thoughts_in_kwargs:
functions = add_inner_thoughts_to_functions(
functions=functions,
inner_thoughts_key=INNER_THOUGHTS_KWARG,
inner_thoughts_description=INNER_THOUGHTS_KWARG_DESCRIPTION,
)
openai_message_list = [
cast_message_to_subtype(m.to_openai_dict(put_inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs)) for m in messages
]
if llm_config.model:
model = llm_config.model
else:
warnings.warn(f"Model type not set in llm_config: {llm_config.model_dump_json(indent=4)}")
model = None
if use_tool_naming:
if function_call is None:
tool_choice = None
elif function_call not in ["none", "auto", "required"]:
tool_choice = ToolFunctionChoice(type="function", function=ToolFunctionChoiceFunctionCall(name=function_call))
else:
tool_choice = function_call
data = ChatCompletionRequest(
model=model,
messages=openai_message_list,
tools=[Tool(type="function", function=f) for f in functions] if functions else None,
tool_choice=tool_choice,
user=str(user_id),
max_tokens=max_tokens,
)
else:
data = ChatCompletionRequest(
model=model,
messages=openai_message_list,
functions=functions,
function_call=function_call,
user=str(user_id),
max_tokens=max_tokens,
)
# https://platform.openai.com/docs/guides/text-generation/json-mode
# only supported by gpt-4o, gpt-4-turbo, or gpt-3.5-turbo
# if "gpt-4o" in llm_config.model or "gpt-4-turbo" in llm_config.model or "gpt-3.5-turbo" in llm_config.model:
# data.response_format = {"type": "json_object"}
if "inference.memgpt.ai" in llm_config.model_endpoint:
# override user id for inference.memgpt.ai
import uuid
data.user = str(uuid.UUID(int=0))
data.model = "memgpt-openai"
return data
def openai_chat_completions_process_stream(
url: str,
api_key: str,
chat_completion_request: ChatCompletionRequest,
stream_interface: Optional[Union[AgentChunkStreamingInterface, AgentRefreshStreamingInterface]] = None,
create_message_id: bool = True,
create_message_datetime: bool = True,
override_tool_call_id: bool = True,
) -> ChatCompletionResponse:
"""Process a streaming completion response, and return a ChatCompletionRequest at the end.
To "stream" the response in Letta, we want to call a streaming-compatible interface function
on the chunks received from the OpenAI-compatible server POST SSE response.
"""
assert chat_completion_request.stream == True
assert stream_interface is not None, "Required"
# Count the prompt tokens
# TODO move to post-request?
chat_history = [m.model_dump(exclude_none=True) for m in chat_completion_request.messages]
# print(chat_history)
prompt_tokens = num_tokens_from_messages(
messages=chat_history,
model=chat_completion_request.model,
)
# We also need to add the cost of including the functions list to the input prompt
if chat_completion_request.tools is not None:
assert chat_completion_request.functions is None
prompt_tokens += num_tokens_from_functions(
functions=[t.function.model_dump() for t in chat_completion_request.tools],
model=chat_completion_request.model,
)
elif chat_completion_request.functions is not None:
assert chat_completion_request.tools is None
prompt_tokens += num_tokens_from_functions(
functions=[f.model_dump() for f in chat_completion_request.functions],
model=chat_completion_request.model,
)
# Create a dummy Message object to get an ID and date
# TODO(sarah): add message ID generation function
dummy_message = _Message(
role=_MessageRole.assistant,
text="",
agent_id="",
model="",
name=None,
tool_calls=None,
tool_call_id=None,
)
TEMP_STREAM_RESPONSE_ID = "temp_id"
TEMP_STREAM_FINISH_REASON = "temp_null"
TEMP_STREAM_TOOL_CALL_ID = "temp_id"
chat_completion_response = ChatCompletionResponse(
id=dummy_message.id if create_message_id else TEMP_STREAM_RESPONSE_ID,
choices=[],
created=dummy_message.created_at, # NOTE: doesn't matter since both will do get_utc_time()
model=chat_completion_request.model,
usage=UsageStatistics(
completion_tokens=0,
prompt_tokens=prompt_tokens,
total_tokens=prompt_tokens,
),
)
if stream_interface:
stream_interface.stream_start()
n_chunks = 0 # approx == n_tokens
try:
for chunk_idx, chat_completion_chunk in enumerate(
openai_chat_completions_request_stream(url=url, api_key=api_key, chat_completion_request=chat_completion_request)
):
assert isinstance(chat_completion_chunk, ChatCompletionChunkResponse), type(chat_completion_chunk)
# NOTE: this assumes that the tool call ID will only appear in one of the chunks during the stream
if override_tool_call_id:
for choice in chat_completion_chunk.choices:
if choice.delta.tool_calls and len(choice.delta.tool_calls) > 0:
for tool_call in choice.delta.tool_calls:
if tool_call.id is not None:
tool_call.id = get_tool_call_id()
if stream_interface:
if isinstance(stream_interface, AgentChunkStreamingInterface):
stream_interface.process_chunk(
chat_completion_chunk,
message_id=chat_completion_response.id if create_message_id else chat_completion_chunk.id,
message_date=chat_completion_response.created if create_message_datetime else chat_completion_chunk.created,
)
elif isinstance(stream_interface, AgentRefreshStreamingInterface):
stream_interface.process_refresh(chat_completion_response)
else:
raise TypeError(stream_interface)
if chunk_idx == 0:
# initialize the choice objects which we will increment with the deltas
num_choices = len(chat_completion_chunk.choices)
assert num_choices > 0
chat_completion_response.choices = [
Choice(
finish_reason=TEMP_STREAM_FINISH_REASON, # NOTE: needs to be ovrerwritten
index=i,
message=Message(
role="assistant",
),
)
for i in range(len(chat_completion_chunk.choices))
]
# add the choice delta
assert len(chat_completion_chunk.choices) == len(chat_completion_response.choices), chat_completion_chunk
for chunk_choice in chat_completion_chunk.choices:
if chunk_choice.finish_reason is not None:
chat_completion_response.choices[chunk_choice.index].finish_reason = chunk_choice.finish_reason
if chunk_choice.logprobs is not None:
chat_completion_response.choices[chunk_choice.index].logprobs = chunk_choice.logprobs
accum_message = chat_completion_response.choices[chunk_choice.index].message
message_delta = chunk_choice.delta
if message_delta.content is not None:
content_delta = message_delta.content
if accum_message.content is None:
accum_message.content = content_delta
else:
accum_message.content += content_delta
# TODO(charles) make sure this works for parallel tool calling?
if message_delta.tool_calls is not None:
tool_calls_delta = message_delta.tool_calls
# If this is the first tool call showing up in a chunk, initialize the list with it
if accum_message.tool_calls is None:
accum_message.tool_calls = [
ToolCall(id=TEMP_STREAM_TOOL_CALL_ID, function=FunctionCall(name="", arguments=""))
for _ in range(len(tool_calls_delta))
]
# There may be many tool calls in a tool calls delta (e.g. parallel tool calls)
for tool_call_delta in tool_calls_delta:
if tool_call_delta.id is not None:
# TODO assert that we're not overwriting?
# TODO += instead of =?
if tool_call_delta.index not in range(len(accum_message.tool_calls)):
warnings.warn(
f"Tool call index out of range ({tool_call_delta.index})\ncurrent tool calls: {accum_message.tool_calls}\ncurrent delta: {tool_call_delta}"
)
else:
accum_message.tool_calls[tool_call_delta.index].id = tool_call_delta.id
if tool_call_delta.function is not None:
if tool_call_delta.function.name is not None:
# TODO assert that we're not overwriting?
# TODO += instead of =?
accum_message.tool_calls[tool_call_delta.index].function.name = tool_call_delta.function.name
if tool_call_delta.function.arguments is not None:
accum_message.tool_calls[tool_call_delta.index].function.arguments += tool_call_delta.function.arguments
if message_delta.function_call is not None:
raise NotImplementedError(f"Old function_call style not support with stream=True")
# overwrite response fields based on latest chunk
if not create_message_id:
chat_completion_response.id = chat_completion_chunk.id
if not create_message_datetime:
chat_completion_response.created = chat_completion_chunk.created
chat_completion_response.model = chat_completion_chunk.model
chat_completion_response.system_fingerprint = chat_completion_chunk.system_fingerprint
# increment chunk counter
n_chunks += 1
except Exception as e:
if stream_interface:
stream_interface.stream_end()
print(f"Parsing ChatCompletion stream failed with error:\n{str(e)}")
raise e
finally:
if stream_interface:
stream_interface.stream_end()
# make sure we didn't leave temp stuff in
assert all([c.finish_reason != TEMP_STREAM_FINISH_REASON for c in chat_completion_response.choices])
assert all(
[
all([tc.id != TEMP_STREAM_TOOL_CALL_ID for tc in c.message.tool_calls]) if c.message.tool_calls else True
for c in chat_completion_response.choices
]
)
if not create_message_id:
assert chat_completion_response.id != dummy_message.id
# compute token usage before returning
# TODO try actually computing the #tokens instead of assuming the chunks is the same
chat_completion_response.usage.completion_tokens = n_chunks
chat_completion_response.usage.total_tokens = prompt_tokens + n_chunks
assert len(chat_completion_response.choices) > 0, chat_completion_response
# printd(chat_completion_response)
return chat_completion_response
def _sse_post(url: str, data: dict, headers: dict) -> Generator[ChatCompletionChunkResponse, None, None]:
with httpx.Client() as client:
with connect_sse(client, method="POST", url=url, json=data, headers=headers) as event_source:
# Inspect for errors before iterating (see https://github.com/florimondmanca/httpx-sse/pull/12)
if not event_source.response.is_success:
# handle errors
from letta.utils import printd
printd("Caught error before iterating SSE request:", vars(event_source.response))
printd(event_source.response.read())
try:
response_bytes = event_source.response.read()
response_dict = json.loads(response_bytes.decode("utf-8"))
error_message = response_dict["error"]["message"]
# e.g.: This model's maximum context length is 8192 tokens. However, your messages resulted in 8198 tokens (7450 in the messages, 748 in the functions). Please reduce the length of the messages or functions.
if OPENAI_CONTEXT_WINDOW_ERROR_SUBSTRING in error_message:
raise LLMError(error_message)
except LLMError:
raise
except:
print(f"Failed to parse SSE message, throwing SSE HTTP error up the stack")
event_source.response.raise_for_status()
try:
for sse in event_source.iter_sse():
# printd(sse.event, sse.data, sse.id, sse.retry)
if sse.data == OPENAI_SSE_DONE:
# print("finished")
break
else:
chunk_data = json.loads(sse.data)
# print("chunk_data::", chunk_data)
chunk_object = ChatCompletionChunkResponse(**chunk_data)
# print("chunk_object::", chunk_object)
# id=chunk_data["id"],
# choices=[ChunkChoice],
# model=chunk_data["model"],
# system_fingerprint=chunk_data["system_fingerprint"]
# )
yield chunk_object
except SSEError as e:
print("Caught an error while iterating the SSE stream:", str(e))
if "application/json" in str(e): # Check if the error is because of JSON response
# TODO figure out a better way to catch the error other than re-trying with a POST
response = client.post(url=url, json=data, headers=headers) # Make the request again to get the JSON response
if response.headers["Content-Type"].startswith("application/json"):
error_details = response.json() # Parse the JSON to get the error message
print("Request:", vars(response.request))
print("POST Error:", error_details)
print("Original SSE Error:", str(e))
else:
print("Failed to retrieve JSON error message via retry.")
else:
print("SSEError not related to 'application/json' content type.")
# Optionally re-raise the exception if you need to propagate it
raise e
except Exception as e:
if event_source.response.request is not None:
print("HTTP Request:", vars(event_source.response.request))
if event_source.response is not None:
print("HTTP Status:", event_source.response.status_code)
print("HTTP Headers:", event_source.response.headers)
# print("HTTP Body:", event_source.response.text)
print("Exception message:", str(e))
raise e
def openai_chat_completions_request_stream(
url: str,
api_key: str,
chat_completion_request: ChatCompletionRequest,
) -> Generator[ChatCompletionChunkResponse, None, None]:
from letta.utils import printd
url = smart_urljoin(url, "chat/completions")
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
data = chat_completion_request.model_dump(exclude_none=True)
printd("Request:\n", json.dumps(data, indent=2))
# If functions == None, strip from the payload
if "functions" in data and data["functions"] is None:
data.pop("functions")
data.pop("function_call", None) # extra safe, should exist always (default="auto")
if "tools" in data and data["tools"] is None:
data.pop("tools")
data.pop("tool_choice", None) # extra safe, should exist always (default="auto")
if "tools" in data:
for tool in data["tools"]:
# tool["strict"] = True
try:
tool["function"] = convert_to_structured_output(tool["function"])
except ValueError as e:
warnings.warn(f"Failed to convert tool function to structured output, tool={tool}, error={e}")
# print(f"\n\n\n\nData[tools]: {json.dumps(data['tools'], indent=2)}")
printd(f"Sending request to {url}")
try:
return _sse_post(url=url, data=data, headers=headers)
except requests.exceptions.HTTPError as http_err:
# Handle HTTP errors (e.g., response 4XX, 5XX)
printd(f"Got HTTPError, exception={http_err}, payload={data}")
raise http_err
except requests.exceptions.RequestException as req_err:
# Handle other requests-related errors (e.g., connection error)
printd(f"Got RequestException, exception={req_err}")
raise req_err
except Exception as e:
# Handle other potential errors
printd(f"Got unknown Exception, exception={e}")
raise e
def openai_chat_completions_request(
url: str,
api_key: str,
chat_completion_request: ChatCompletionRequest,
) -> ChatCompletionResponse:
"""Send a ChatCompletion request to an OpenAI-compatible server
If request.stream == True, will yield ChatCompletionChunkResponses
If request.stream == False, will return a ChatCompletionResponse
https://platform.openai.com/docs/guides/text-generation?lang=curl
"""
from letta.utils import printd
url = smart_urljoin(url, "chat/completions")
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
data = chat_completion_request.model_dump(exclude_none=True)
# add check otherwise will cause error: "Invalid value for 'parallel_tool_calls': 'parallel_tool_calls' is only allowed when 'tools' are specified."
if chat_completion_request.tools is not None:
data["parallel_tool_calls"] = False
printd("Request:\n", json.dumps(data, indent=2))
# If functions == None, strip from the payload
if "functions" in data and data["functions"] is None:
data.pop("functions")
data.pop("function_call", None) # extra safe, should exist always (default="auto")
if "tools" in data and data["tools"] is None:
data.pop("tools")
data.pop("tool_choice", None) # extra safe, should exist always (default="auto")
if "tools" in data:
for tool in data["tools"]:
try:
tool["function"] = convert_to_structured_output(tool["function"])
except ValueError as e:
warnings.warn(f"Failed to convert tool function to structured output, tool={tool}, error={e}")
response_json = make_post_request(url, headers, data)
return ChatCompletionResponse(**response_json)
def openai_embeddings_request(url: str, api_key: str, data: dict) -> EmbeddingResponse:
"""https://platform.openai.com/docs/api-reference/embeddings/create"""
url = smart_urljoin(url, "embeddings")
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
response_json = make_post_request(url, headers, data)
return EmbeddingResponse(**response_json)