* fix(core): handle PermissionDeniedError in provider API key validation Fixed OpenAI PermissionDeniedError being raised as unknown error when validating provider API keys. The check_api_key methods in OpenAI-based providers (OpenAI, OpenRouter, Azure, Together) now properly catch and re-raise PermissionDeniedError as LLMPermissionDeniedError. 🐛 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): handle Unicode surrogates in OpenAI requests Sanitize invalid UTF-16 surrogates before sending requests to OpenAI API. Fixes UnicodeEncodeError when message content contains unpaired surrogates from corrupted emoji data or malformed Unicode sequences. 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): handle MCP tool schema validation errors gracefully Catch fastmcp.exceptions.ToolError in execute_mcp_tool endpoint and convert to LettaInvalidArgumentError (400) instead of letting it propagate as 500 error. This is an expected user error when tool arguments don't match the MCP tool's schema. Fixes Datadog issue 8f2d874a-f8e5-11f0-9b25-da7ad0900000 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): handle ExceptionGroup-wrapped ToolError in MCP executor When MCP tools fail with validation errors (e.g., missing required parameters), fastmcp raises ToolError exceptions that may be wrapped in ExceptionGroup by Python's async TaskGroup. The exception handler now unwraps single-exception groups before checking if the error should be handled gracefully. Fixes Calendly API "organization parameter missing" errors being logged to Datadog instead of returning friendly error messages to users. 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix: handle missing agent in create_conversation to prevent foreign key violation * Update .gitignore --------- Co-authored-by: Letta <noreply@letta.com>
1287 lines
57 KiB
Python
1287 lines
57 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
import time
|
|
from typing import Any, List, Optional
|
|
|
|
import httpx
|
|
import openai
|
|
from openai import AsyncOpenAI, AsyncStream, OpenAI
|
|
from openai.types import Reasoning
|
|
from openai.types.chat.chat_completion import ChatCompletion
|
|
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk
|
|
from openai.types.responses import ResponseTextConfigParam
|
|
from openai.types.responses.response_stream_event import ResponseStreamEvent
|
|
|
|
from letta.constants import LETTA_MODEL_ENDPOINT, REQUEST_HEARTBEAT_PARAM
|
|
from letta.errors import (
|
|
ContextWindowExceededError,
|
|
ErrorCode,
|
|
LLMAuthenticationError,
|
|
LLMBadRequestError,
|
|
LLMConnectionError,
|
|
LLMNotFoundError,
|
|
LLMPermissionDeniedError,
|
|
LLMRateLimitError,
|
|
LLMServerError,
|
|
LLMTimeoutError,
|
|
LLMUnprocessableEntityError,
|
|
)
|
|
from letta.helpers.json_helpers import sanitize_unicode_surrogates
|
|
from letta.llm_api.error_utils import is_context_window_overflow_message
|
|
from letta.llm_api.helpers import (
|
|
add_inner_thoughts_to_functions,
|
|
convert_response_format_to_responses_api,
|
|
unpack_all_inner_thoughts_from_kwargs,
|
|
)
|
|
from letta.llm_api.llm_client_base import LLMClientBase
|
|
from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION, INNER_THOUGHTS_KWARG_DESCRIPTION_GO_FIRST
|
|
from letta.log import get_logger
|
|
from letta.otel.tracing import trace_method
|
|
from letta.schemas.agent import AgentType
|
|
from letta.schemas.embedding_config import EmbeddingConfig
|
|
from letta.schemas.letta_message_content import MessageContentType
|
|
from letta.schemas.llm_config import LLMConfig
|
|
from letta.schemas.message import Message as PydanticMessage
|
|
from letta.schemas.openai.chat_completion_request import (
|
|
ChatCompletionRequest,
|
|
FunctionCall as ToolFunctionChoiceFunctionCall,
|
|
FunctionSchema,
|
|
Tool as OpenAITool,
|
|
ToolFunctionChoice,
|
|
cast_message_to_subtype,
|
|
)
|
|
from letta.schemas.openai.chat_completion_response import (
|
|
ChatCompletionResponse,
|
|
Choice,
|
|
FunctionCall,
|
|
Message as ChoiceMessage,
|
|
ToolCall,
|
|
UsageStatistics,
|
|
)
|
|
from letta.schemas.openai.responses_request import ResponsesRequest
|
|
from letta.schemas.response_format import JsonSchemaResponseFormat
|
|
from letta.schemas.usage import LettaUsageStatistics
|
|
from letta.settings import model_settings
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
def sanitize_unicode_surrogates(obj: Any) -> Any:
|
|
"""Recursively sanitize invalid Unicode surrogates in strings within nested data structures.
|
|
|
|
This fixes UnicodeEncodeError when the OpenAI SDK tries to encode requests containing
|
|
unpaired UTF-16 surrogates (e.g., '\ud83c' without its pair) which can occur in corrupted
|
|
emoji data or malformed Unicode sequences.
|
|
|
|
Args:
|
|
obj: The object to sanitize (dict, list, str, or other types)
|
|
|
|
Returns:
|
|
The sanitized object with invalid surrogates replaced by the Unicode replacement character
|
|
"""
|
|
if isinstance(obj, dict):
|
|
return {k: sanitize_unicode_surrogates(v) for k, v in obj.items()}
|
|
elif isinstance(obj, list):
|
|
return [sanitize_unicode_surrogates(item) for item in obj]
|
|
elif isinstance(obj, str):
|
|
try:
|
|
obj.encode("utf-8")
|
|
return obj
|
|
except UnicodeEncodeError:
|
|
return obj.encode("utf-8", errors="replace").decode("utf-8")
|
|
else:
|
|
return obj
|
|
|
|
|
|
def is_openai_reasoning_model(model: str) -> bool:
|
|
"""Utility function to check if the model is a 'reasoner'"""
|
|
|
|
# NOTE: needs to be updated with new model releases
|
|
is_reasoning = model.startswith("o1") or model.startswith("o3") or model.startswith("o4") or model.startswith("gpt-5")
|
|
return is_reasoning
|
|
|
|
|
|
def does_not_support_minimal_reasoning(model: str) -> bool:
|
|
"""Check if the model does not support minimal reasoning effort.
|
|
|
|
Currently, models that contain codex don't support minimal reasoning.
|
|
"""
|
|
return "codex" in model.lower()
|
|
|
|
|
|
def supports_none_reasoning_effort(model: str) -> bool:
|
|
"""Check if the model supports 'none' reasoning effort.
|
|
|
|
Currently, GPT-5.1 and GPT-5.2 models support the 'none' reasoning effort level.
|
|
"""
|
|
return model.startswith("gpt-5.1") or model.startswith("gpt-5.2")
|
|
|
|
|
|
def is_openai_5_model(model: str) -> bool:
|
|
"""Utility function to check if the model is a '5' model"""
|
|
return model.startswith("gpt-5")
|
|
|
|
|
|
def supports_verbosity_control(model: str) -> bool:
|
|
"""Check if the model supports verbosity control, currently only GPT-5 models support this"""
|
|
return is_openai_5_model(model)
|
|
|
|
|
|
def accepts_developer_role(model: str) -> bool:
|
|
"""Checks if the model accepts the 'developer' role. Note that not all reasoning models accept this role.
|
|
|
|
See: https://community.openai.com/t/developer-role-not-accepted-for-o1-o1-mini-o3-mini/1110750/7
|
|
"""
|
|
if is_openai_reasoning_model(model) and "o1-mini" not in model or "o1-preview" in model:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def supports_temperature_param(model: str) -> bool:
|
|
"""Certain OpenAI models don't support configuring the temperature.
|
|
|
|
Example error: 400 - {'error': {'message': "Unsupported parameter: 'temperature' is not supported with this model.", 'type': 'invalid_request_error', 'param': 'temperature', 'code': 'unsupported_parameter'}}
|
|
"""
|
|
if is_openai_reasoning_model(model) or is_openai_5_model(model):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def supports_parallel_tool_calling(model: str) -> bool:
|
|
"""Certain OpenAI models don't support parallel tool calls."""
|
|
|
|
if is_openai_reasoning_model(model):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
# TODO move into LLMConfig as a field?
|
|
def supports_structured_output(llm_config: LLMConfig) -> bool:
|
|
"""Certain providers don't support structured output."""
|
|
|
|
# FIXME pretty hacky - turn off for providers we know users will use,
|
|
# but also don't support structured output
|
|
if llm_config.model_endpoint and "nebius.com" in llm_config.model_endpoint:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
# TODO move into LLMConfig as a field?
|
|
def requires_auto_tool_choice(llm_config: LLMConfig) -> bool:
|
|
"""Certain providers require the tool choice to be set to 'auto'."""
|
|
if llm_config.model_endpoint and "nebius.com" in llm_config.model_endpoint:
|
|
return True
|
|
if llm_config.handle and "vllm" in llm_config.handle:
|
|
return True
|
|
if llm_config.compatibility_type == "mlx":
|
|
return True
|
|
return False
|
|
|
|
|
|
def use_responses_api(llm_config: LLMConfig) -> bool:
|
|
# TODO can opt in all reasoner models to use the Responses API
|
|
return is_openai_reasoning_model(llm_config.model)
|
|
|
|
|
|
def supports_content_none(llm_config: LLMConfig) -> bool:
|
|
"""Certain providers don't support the content None."""
|
|
if "gpt-oss" in llm_config.model:
|
|
return False
|
|
return True
|
|
|
|
|
|
class OpenAIClient(LLMClientBase):
|
|
def _prepare_client_kwargs(self, llm_config: LLMConfig) -> dict:
|
|
api_key, _, _ = self.get_byok_overrides(llm_config)
|
|
has_byok_key = api_key is not None # Track if we got a BYOK key
|
|
|
|
# Default to global OpenAI key when no BYOK override
|
|
if not api_key:
|
|
api_key = model_settings.openai_api_key or os.environ.get("OPENAI_API_KEY")
|
|
|
|
kwargs = {"api_key": api_key, "base_url": llm_config.model_endpoint}
|
|
|
|
# OpenRouter-specific overrides: use OpenRouter key and optional headers
|
|
is_openrouter = (llm_config.model_endpoint and "openrouter.ai" in llm_config.model_endpoint) or (
|
|
llm_config.provider_name == "openrouter"
|
|
)
|
|
if is_openrouter:
|
|
# Only use prod OpenRouter key if no BYOK key was provided
|
|
if not has_byok_key:
|
|
or_key = model_settings.openrouter_api_key or os.environ.get("OPENROUTER_API_KEY")
|
|
if or_key:
|
|
kwargs["api_key"] = or_key
|
|
# Attach optional headers if provided
|
|
headers = {}
|
|
if model_settings.openrouter_referer:
|
|
headers["HTTP-Referer"] = model_settings.openrouter_referer
|
|
if model_settings.openrouter_title:
|
|
headers["X-Title"] = model_settings.openrouter_title
|
|
if headers:
|
|
kwargs["default_headers"] = headers
|
|
|
|
# The OpenAI client requires some API key value
|
|
kwargs["api_key"] = kwargs.get("api_key") or "DUMMY_API_KEY"
|
|
|
|
return kwargs
|
|
|
|
def _prepare_client_kwargs_embedding(self, embedding_config: EmbeddingConfig) -> dict:
|
|
api_key = model_settings.openai_api_key or os.environ.get("OPENAI_API_KEY")
|
|
# supposedly the openai python client requires a dummy API key
|
|
api_key = api_key or "DUMMY_API_KEY"
|
|
kwargs = {"api_key": api_key, "base_url": embedding_config.embedding_endpoint}
|
|
return kwargs
|
|
|
|
async def _prepare_client_kwargs_async(self, llm_config: LLMConfig) -> dict:
|
|
api_key, _, _ = await self.get_byok_overrides_async(llm_config)
|
|
has_byok_key = api_key is not None # Track if we got a BYOK key
|
|
|
|
if not api_key:
|
|
api_key = model_settings.openai_api_key or os.environ.get("OPENAI_API_KEY")
|
|
kwargs = {"api_key": api_key, "base_url": llm_config.model_endpoint}
|
|
|
|
is_openrouter = (llm_config.model_endpoint and "openrouter.ai" in llm_config.model_endpoint) or (
|
|
llm_config.provider_name == "openrouter"
|
|
)
|
|
if is_openrouter:
|
|
# Only use prod OpenRouter key if no BYOK key was provided
|
|
if not has_byok_key:
|
|
or_key = model_settings.openrouter_api_key or os.environ.get("OPENROUTER_API_KEY")
|
|
if or_key:
|
|
kwargs["api_key"] = or_key
|
|
headers = {}
|
|
if model_settings.openrouter_referer:
|
|
headers["HTTP-Referer"] = model_settings.openrouter_referer
|
|
if model_settings.openrouter_title:
|
|
headers["X-Title"] = model_settings.openrouter_title
|
|
if headers:
|
|
kwargs["default_headers"] = headers
|
|
|
|
kwargs["api_key"] = kwargs.get("api_key") or "DUMMY_API_KEY"
|
|
|
|
return kwargs
|
|
|
|
def requires_auto_tool_choice(self, llm_config: LLMConfig) -> bool:
|
|
return requires_auto_tool_choice(llm_config)
|
|
|
|
def supports_structured_output(self, llm_config: LLMConfig) -> bool:
|
|
return supports_structured_output(llm_config)
|
|
|
|
@trace_method
|
|
def build_request_data_responses(
|
|
self,
|
|
agent_type: AgentType, # if react, use native content + strip heartbeats
|
|
messages: List[PydanticMessage],
|
|
llm_config: LLMConfig,
|
|
tools: Optional[List[dict]] = None, # Keep as dict for now as per base class
|
|
force_tool_call: Optional[str] = None,
|
|
requires_subsequent_tool_call: bool = False,
|
|
tool_return_truncation_chars: Optional[int] = None,
|
|
) -> dict:
|
|
"""
|
|
Constructs a request object in the expected data format for the OpenAI Responses API.
|
|
"""
|
|
if llm_config.put_inner_thoughts_in_kwargs:
|
|
raise ValueError("Inner thoughts in kwargs are not supported for the OpenAI Responses API")
|
|
|
|
openai_messages_list = PydanticMessage.to_openai_responses_dicts_from_list(
|
|
messages, tool_return_truncation_chars=tool_return_truncation_chars
|
|
)
|
|
# Add multi-modal support for Responses API by rewriting user messages
|
|
# into input_text/input_image parts.
|
|
openai_messages_list = fill_image_content_in_responses_input(openai_messages_list, messages)
|
|
|
|
if llm_config.model:
|
|
model = llm_config.model
|
|
else:
|
|
logger.warning(f"Model type not set in llm_config: {llm_config.model_dump_json(indent=4)}")
|
|
model = None
|
|
|
|
# Default to auto, unless there's a forced tool call coming from above or requires_subsequent_tool_call is True
|
|
tool_choice = None
|
|
if tools: # only set tool_choice if tools exist
|
|
if force_tool_call is not None:
|
|
tool_choice = {"type": "function", "name": force_tool_call}
|
|
elif requires_subsequent_tool_call:
|
|
tool_choice = "required"
|
|
else:
|
|
tool_choice = "auto"
|
|
|
|
# Convert the tools from the ChatCompletions style to the Responses style
|
|
if tools:
|
|
# Get proper typing
|
|
typed_tools: List[OpenAITool] = [OpenAITool(type="function", function=f) for f in tools]
|
|
|
|
# Strip request heartbeat
|
|
# TODO relax this?
|
|
if agent_type == AgentType.letta_v1_agent:
|
|
new_tools = []
|
|
for tool in typed_tools:
|
|
# Remove request_heartbeat from the properties if it exists
|
|
if tool.function.parameters and "properties" in tool.function.parameters:
|
|
tool.function.parameters["properties"].pop(REQUEST_HEARTBEAT_PARAM, None)
|
|
# Also remove from required list if present
|
|
if "required" in tool.function.parameters and REQUEST_HEARTBEAT_PARAM in tool.function.parameters["required"]:
|
|
tool.function.parameters["required"].remove(REQUEST_HEARTBEAT_PARAM)
|
|
new_tools.append(tool.model_copy(deep=True))
|
|
typed_tools = new_tools
|
|
|
|
# Note: Tools are already processed by enable_strict_mode() in the workflow/agent code
|
|
# (temporal_letta_v1_agent_workflow.py or letta_agent_v3.py) before reaching here.
|
|
# enable_strict_mode() handles: strict flag, additionalProperties, required array, nullable fields
|
|
# Convert to a Responses-friendly dict, preserving the strict setting from the tool schema
|
|
responses_tools = [
|
|
{
|
|
"type": "function",
|
|
"name": t.function.name,
|
|
"description": t.function.description,
|
|
"parameters": t.function.parameters,
|
|
"strict": t.function.strict,
|
|
}
|
|
for t in typed_tools
|
|
]
|
|
else:
|
|
responses_tools = None
|
|
|
|
# Prepare the request payload
|
|
data = ResponsesRequest(
|
|
# Responses specific
|
|
store=False,
|
|
include=["reasoning.encrypted_content"],
|
|
# More or less generic to ChatCompletions API
|
|
model=model,
|
|
input=openai_messages_list,
|
|
tools=responses_tools,
|
|
tool_choice=tool_choice,
|
|
max_output_tokens=llm_config.max_tokens,
|
|
temperature=llm_config.temperature if supports_temperature_param(model) else None,
|
|
parallel_tool_calls=llm_config.parallel_tool_calls if tools and supports_parallel_tool_calling(model) else False,
|
|
)
|
|
|
|
# Handle text configuration (verbosity and response format)
|
|
text_config_kwargs = {}
|
|
|
|
# Add verbosity control for GPT-5 models
|
|
if supports_verbosity_control(model) and llm_config.verbosity:
|
|
text_config_kwargs["verbosity"] = llm_config.verbosity
|
|
|
|
# Add response_format support for structured outputs via text.format
|
|
if hasattr(llm_config, "response_format") and llm_config.response_format is not None:
|
|
format_dict = convert_response_format_to_responses_api(llm_config.response_format)
|
|
if format_dict is not None:
|
|
text_config_kwargs["format"] = format_dict
|
|
|
|
# Set text config if we have any parameters
|
|
if text_config_kwargs:
|
|
data.text = ResponseTextConfigParam(**text_config_kwargs)
|
|
|
|
# Add reasoning effort control for reasoning models
|
|
# Only set reasoning if effort is not "none" (GPT-5.1 uses "none" to disable reasoning)
|
|
if is_openai_reasoning_model(model) and llm_config.reasoning_effort and llm_config.reasoning_effort != "none":
|
|
# data.reasoning_effort = llm_config.reasoning_effort
|
|
data.reasoning = Reasoning(
|
|
effort=llm_config.reasoning_effort,
|
|
# NOTE: hardcoding summary level, could put in llm_config?
|
|
summary="detailed",
|
|
)
|
|
|
|
# TODO I don't see this in Responses?
|
|
# Add frequency penalty
|
|
# if llm_config.frequency_penalty is not None:
|
|
# data.frequency_penalty = llm_config.frequency_penalty
|
|
|
|
# Add parallel tool calling
|
|
if tools and supports_parallel_tool_calling(model):
|
|
data.parallel_tool_calls = llm_config.parallel_tool_calls
|
|
|
|
# always set user id for openai requests
|
|
if self.actor:
|
|
data.user = self.actor.id
|
|
|
|
if llm_config.model_endpoint == LETTA_MODEL_ENDPOINT:
|
|
if not self.actor:
|
|
# override user id for inference.letta.com
|
|
import uuid
|
|
|
|
data.user = str(uuid.UUID(int=0))
|
|
|
|
data.model = "memgpt-openai"
|
|
|
|
request_data = data.model_dump(exclude_unset=True)
|
|
# print("responses request data", request_data)
|
|
return request_data
|
|
|
|
@trace_method
|
|
def build_request_data(
|
|
self,
|
|
agent_type: AgentType, # if react, use native content + strip heartbeats
|
|
messages: List[PydanticMessage],
|
|
llm_config: LLMConfig,
|
|
tools: Optional[List[dict]] = None, # Keep as dict for now as per base class
|
|
force_tool_call: Optional[str] = None,
|
|
requires_subsequent_tool_call: bool = False,
|
|
tool_return_truncation_chars: Optional[int] = None,
|
|
) -> dict:
|
|
"""
|
|
Constructs a request object in the expected data format for the OpenAI API.
|
|
"""
|
|
# Shortcut for GPT-5 to use Responses API, but only for letta_v1_agent
|
|
if use_responses_api(llm_config) and agent_type == AgentType.letta_v1_agent:
|
|
return self.build_request_data_responses(
|
|
agent_type=agent_type,
|
|
messages=messages,
|
|
llm_config=llm_config,
|
|
tools=tools,
|
|
force_tool_call=force_tool_call,
|
|
requires_subsequent_tool_call=requires_subsequent_tool_call,
|
|
tool_return_truncation_chars=tool_return_truncation_chars,
|
|
)
|
|
|
|
if agent_type == AgentType.letta_v1_agent:
|
|
# Safety hard override in case it got set somewhere by accident
|
|
llm_config.put_inner_thoughts_in_kwargs = False
|
|
|
|
if tools and llm_config.put_inner_thoughts_in_kwargs:
|
|
# Special case for LM Studio backend since it needs extra guidance to force out the thoughts first
|
|
# TODO(fix)
|
|
inner_thoughts_desc = (
|
|
INNER_THOUGHTS_KWARG_DESCRIPTION_GO_FIRST
|
|
if llm_config.model_endpoint and ":1234" in llm_config.model_endpoint
|
|
else INNER_THOUGHTS_KWARG_DESCRIPTION
|
|
)
|
|
tools = add_inner_thoughts_to_functions(
|
|
functions=tools,
|
|
inner_thoughts_key=INNER_THOUGHTS_KWARG,
|
|
inner_thoughts_description=inner_thoughts_desc,
|
|
put_inner_thoughts_first=True,
|
|
)
|
|
|
|
use_developer_message = accepts_developer_role(llm_config.model)
|
|
|
|
openai_message_list = [
|
|
cast_message_to_subtype(m)
|
|
for m in PydanticMessage.to_openai_dicts_from_list(
|
|
messages,
|
|
put_inner_thoughts_in_kwargs=llm_config.put_inner_thoughts_in_kwargs,
|
|
use_developer_message=use_developer_message,
|
|
tool_return_truncation_chars=tool_return_truncation_chars,
|
|
)
|
|
]
|
|
|
|
if llm_config.model:
|
|
model = llm_config.model
|
|
else:
|
|
logger.warning(f"Model type not set in llm_config: {llm_config.model_dump_json(indent=4)}")
|
|
model = None
|
|
|
|
# TODO: we may need to extend this to more models using proxy?
|
|
is_openrouter = (llm_config.model_endpoint and "openrouter.ai" in llm_config.model_endpoint) or (
|
|
llm_config.provider_name == "openrouter"
|
|
)
|
|
if is_openrouter:
|
|
try:
|
|
model = llm_config.handle.split("/", 1)[-1]
|
|
except:
|
|
# don't raise error since this isn't robust against edge cases
|
|
pass
|
|
|
|
# force function calling for reliability, see https://platform.openai.com/docs/api-reference/chat/create#chat-create-tool_choice
|
|
# TODO(matt) move into LLMConfig
|
|
# TODO: This vllm checking is very brittle and is a patch at most
|
|
tool_choice = None
|
|
if tools: # only set tool_choice if tools exist
|
|
if force_tool_call is not None:
|
|
# OpenRouter proxies to providers that may not support object-format tool_choice
|
|
# Use "required" instead which achieves similar effect
|
|
if is_openrouter:
|
|
tool_choice = "required"
|
|
else:
|
|
tool_choice = ToolFunctionChoice(type="function", function=ToolFunctionChoiceFunctionCall(name=force_tool_call))
|
|
elif requires_subsequent_tool_call:
|
|
tool_choice = "required"
|
|
elif self.requires_auto_tool_choice(llm_config) or agent_type == AgentType.letta_v1_agent:
|
|
tool_choice = "auto"
|
|
else:
|
|
# only set if tools is non-Null
|
|
tool_choice = "required"
|
|
|
|
if not supports_content_none(llm_config):
|
|
for message in openai_message_list:
|
|
if message.content is None:
|
|
message.content = ""
|
|
|
|
data = ChatCompletionRequest(
|
|
model=model,
|
|
messages=fill_image_content_in_messages(openai_message_list, messages),
|
|
tools=[OpenAITool(type="function", function=f) for f in tools] if tools else None,
|
|
tool_choice=tool_choice,
|
|
user=str(),
|
|
max_completion_tokens=llm_config.max_tokens,
|
|
# NOTE: the reasoners that don't support temperature require 1.0, not None
|
|
temperature=llm_config.temperature if supports_temperature_param(model) else 1.0,
|
|
)
|
|
|
|
# Add verbosity control for GPT-5 models
|
|
if supports_verbosity_control(model) and llm_config.verbosity:
|
|
data.verbosity = llm_config.verbosity
|
|
|
|
# Add reasoning effort control for reasoning models
|
|
# Only set reasoning_effort if it's not "none" (GPT-5.1 uses "none" to disable reasoning)
|
|
if is_openai_reasoning_model(model) and llm_config.reasoning_effort and llm_config.reasoning_effort != "none":
|
|
data.reasoning_effort = llm_config.reasoning_effort
|
|
|
|
if llm_config.frequency_penalty is not None:
|
|
data.frequency_penalty = llm_config.frequency_penalty
|
|
|
|
if tools and supports_parallel_tool_calling(model):
|
|
data.parallel_tool_calls = False
|
|
|
|
# Add response_format support for structured outputs
|
|
if hasattr(llm_config, "response_format") and llm_config.response_format is not None:
|
|
# For Chat Completions API, we need the full nested structure
|
|
if isinstance(llm_config.response_format, JsonSchemaResponseFormat):
|
|
# Convert to the OpenAI SDK format
|
|
data.response_format = {"type": "json_schema", "json_schema": llm_config.response_format.json_schema}
|
|
else:
|
|
# For text or json_object, just pass the type
|
|
data.response_format = {"type": llm_config.response_format.type}
|
|
|
|
# always set user id for openai requests
|
|
if self.actor:
|
|
data.user = self.actor.id
|
|
|
|
if llm_config.model_endpoint == LETTA_MODEL_ENDPOINT:
|
|
if not self.actor:
|
|
# override user id for inference.letta.com
|
|
import uuid
|
|
|
|
data.user = str(uuid.UUID(int=0))
|
|
|
|
data.model = "memgpt-openai"
|
|
|
|
# For some reason, request heartbeats are still leaking into here...
|
|
# So strip them manually for v3
|
|
if agent_type == AgentType.letta_v1_agent:
|
|
new_tools = []
|
|
if data.tools:
|
|
for tool in data.tools:
|
|
# Remove request_heartbeat from the properties if it exists
|
|
if tool.function.parameters and "properties" in tool.function.parameters:
|
|
tool.function.parameters["properties"].pop(REQUEST_HEARTBEAT_PARAM, None)
|
|
# Also remove from required list if present
|
|
if "required" in tool.function.parameters and REQUEST_HEARTBEAT_PARAM in tool.function.parameters["required"]:
|
|
tool.function.parameters["required"].remove(REQUEST_HEARTBEAT_PARAM)
|
|
new_tools.append(tool.model_copy(deep=True))
|
|
data.tools = new_tools
|
|
|
|
# Note: Tools are already processed by enable_strict_mode() in the workflow/agent code
|
|
# (temporal_letta_v1_agent_workflow.py or letta_agent_v3.py) before reaching here.
|
|
# enable_strict_mode() handles: strict flag, additionalProperties, required array, nullable fields
|
|
# We only need to ensure strict is False for providers that don't support structured output
|
|
if data.tools is not None and len(data.tools) > 0:
|
|
for tool in data.tools:
|
|
if not supports_structured_output(llm_config):
|
|
# Provider doesn't support structured output - ensure strict is False
|
|
tool.function.strict = False
|
|
request_data = data.model_dump(exclude_unset=True)
|
|
|
|
# If Ollama
|
|
# if llm_config.handle.startswith("ollama/") and llm_config.enable_reasoner:
|
|
# Sadly, reasoning via the OpenAI proxy on Ollama only works for Harmony/gpt-oss
|
|
# Ollama's OpenAI layer simply looks for the presence of 'reasoining' or 'reasoning_effort'
|
|
# If set, then in the backend "medium" thinking is turned on
|
|
# request_data["reasoning_effort"] = "medium"
|
|
|
|
# Add OpenRouter reasoning configuration via extra_body
|
|
if is_openrouter and llm_config.enable_reasoner:
|
|
reasoning_config = {}
|
|
if llm_config.reasoning_effort:
|
|
reasoning_config["effort"] = llm_config.reasoning_effort
|
|
if llm_config.max_reasoning_tokens and llm_config.max_reasoning_tokens > 0:
|
|
reasoning_config["max_tokens"] = llm_config.max_reasoning_tokens
|
|
if not reasoning_config:
|
|
reasoning_config = {"enabled": True}
|
|
request_data["extra_body"] = {"reasoning": reasoning_config}
|
|
|
|
return request_data
|
|
|
|
@trace_method
|
|
def request(self, request_data: dict, llm_config: LLMConfig) -> dict:
|
|
"""
|
|
Performs underlying synchronous request to OpenAI API and returns raw response dict.
|
|
"""
|
|
# Sanitize Unicode surrogates to prevent encoding errors
|
|
request_data = sanitize_unicode_surrogates(request_data)
|
|
|
|
client = OpenAI(**self._prepare_client_kwargs(llm_config))
|
|
# Route based on payload shape: Responses uses 'input', Chat Completions uses 'messages'
|
|
if "input" in request_data and "messages" not in request_data:
|
|
resp = client.responses.create(**request_data)
|
|
return resp.model_dump()
|
|
else:
|
|
response: ChatCompletion = client.chat.completions.create(**request_data)
|
|
return response.model_dump()
|
|
|
|
@trace_method
|
|
async def request_async(self, request_data: dict, llm_config: LLMConfig) -> dict:
|
|
"""
|
|
Performs underlying asynchronous request to OpenAI API and returns raw response dict.
|
|
"""
|
|
# Sanitize Unicode surrogates to prevent encoding errors
|
|
request_data = sanitize_unicode_surrogates(request_data)
|
|
|
|
kwargs = await self._prepare_client_kwargs_async(llm_config)
|
|
client = AsyncOpenAI(**kwargs)
|
|
# Route based on payload shape: Responses uses 'input', Chat Completions uses 'messages'
|
|
if "input" in request_data and "messages" not in request_data:
|
|
resp = await client.responses.create(**request_data)
|
|
return resp.model_dump()
|
|
else:
|
|
response: ChatCompletion = await client.chat.completions.create(**request_data)
|
|
return response.model_dump()
|
|
|
|
def is_reasoning_model(self, llm_config: LLMConfig) -> bool:
|
|
return is_openai_reasoning_model(llm_config.model)
|
|
|
|
def extract_usage_statistics(self, response_data: dict | None, llm_config: LLMConfig) -> LettaUsageStatistics:
|
|
"""Extract usage statistics from OpenAI response and return as LettaUsageStatistics."""
|
|
if not response_data:
|
|
return LettaUsageStatistics()
|
|
|
|
# Handle Responses API format (used by reasoning models like o1/o3)
|
|
if response_data.get("object") == "response":
|
|
usage = response_data.get("usage", {}) or {}
|
|
prompt_tokens = usage.get("input_tokens") or 0
|
|
completion_tokens = usage.get("output_tokens") or 0
|
|
total_tokens = usage.get("total_tokens") or (prompt_tokens + completion_tokens)
|
|
|
|
input_details = usage.get("input_tokens_details", {}) or {}
|
|
cached_tokens = input_details.get("cached_tokens")
|
|
|
|
output_details = usage.get("output_tokens_details", {}) or {}
|
|
reasoning_tokens = output_details.get("reasoning_tokens")
|
|
|
|
return LettaUsageStatistics(
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens,
|
|
total_tokens=total_tokens,
|
|
cached_input_tokens=cached_tokens,
|
|
reasoning_tokens=reasoning_tokens,
|
|
)
|
|
|
|
# Handle standard Chat Completions API format using pydantic models
|
|
from openai.types.chat import ChatCompletion
|
|
|
|
try:
|
|
completion = ChatCompletion.model_validate(response_data)
|
|
except Exception:
|
|
return LettaUsageStatistics()
|
|
|
|
if not completion.usage:
|
|
return LettaUsageStatistics()
|
|
|
|
usage = completion.usage
|
|
prompt_tokens = usage.prompt_tokens or 0
|
|
completion_tokens = usage.completion_tokens or 0
|
|
total_tokens = usage.total_tokens or (prompt_tokens + completion_tokens)
|
|
|
|
# Extract cached tokens from prompt_tokens_details
|
|
cached_tokens = None
|
|
if usage.prompt_tokens_details:
|
|
cached_tokens = usage.prompt_tokens_details.cached_tokens
|
|
|
|
# Extract reasoning tokens from completion_tokens_details
|
|
reasoning_tokens = None
|
|
if usage.completion_tokens_details:
|
|
reasoning_tokens = usage.completion_tokens_details.reasoning_tokens
|
|
|
|
return LettaUsageStatistics(
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens,
|
|
total_tokens=total_tokens,
|
|
cached_input_tokens=cached_tokens,
|
|
reasoning_tokens=reasoning_tokens,
|
|
)
|
|
|
|
@trace_method
|
|
async def convert_response_to_chat_completion(
|
|
self,
|
|
response_data: dict,
|
|
input_messages: List[PydanticMessage], # Included for consistency, maybe used later
|
|
llm_config: LLMConfig,
|
|
) -> ChatCompletionResponse:
|
|
"""
|
|
Converts raw OpenAI response dict into the ChatCompletionResponse Pydantic model.
|
|
Handles potential extraction of inner thoughts if they were added via kwargs.
|
|
"""
|
|
if "object" in response_data and response_data["object"] == "response":
|
|
# Map Responses API shape to Chat Completions shape
|
|
# See example payload in tests/integration_test_send_message_v2.py
|
|
model = response_data.get("model")
|
|
|
|
# Extract usage via centralized method
|
|
from letta.schemas.enums import ProviderType
|
|
|
|
usage_stats = self.extract_usage_statistics(response_data, llm_config).to_usage(ProviderType.openai)
|
|
|
|
# Extract assistant message text from the outputs list
|
|
outputs = response_data.get("output") or []
|
|
assistant_text_parts = []
|
|
reasoning_summary_parts = None
|
|
reasoning_content_signature = None
|
|
tool_calls = None
|
|
|
|
# Check for incomplete_details first (e.g., max_output_tokens reached)
|
|
incomplete_details = response_data.get("incomplete_details")
|
|
if incomplete_details and incomplete_details.get("reason") == "max_output_tokens":
|
|
finish_reason = "length"
|
|
elif response_data.get("status") == "completed":
|
|
finish_reason = "stop"
|
|
else:
|
|
finish_reason = None
|
|
|
|
# Optionally capture reasoning presence
|
|
found_reasoning = False
|
|
for out in outputs:
|
|
out_type = (out or {}).get("type")
|
|
if out_type == "message":
|
|
content_list = (out or {}).get("content") or []
|
|
for part in content_list:
|
|
if (part or {}).get("type") == "output_text":
|
|
text_val = (part or {}).get("text")
|
|
if text_val:
|
|
assistant_text_parts.append(text_val)
|
|
elif out_type == "reasoning":
|
|
found_reasoning = True
|
|
reasoning_summary_parts = [part.get("text") for part in out.get("summary")]
|
|
reasoning_content_signature = out.get("encrypted_content")
|
|
elif out_type == "function_call":
|
|
tool_calls = [
|
|
ToolCall(
|
|
id=out.get("call_id"),
|
|
type="function",
|
|
function=FunctionCall(
|
|
name=out.get("name"),
|
|
arguments=out.get("arguments"),
|
|
),
|
|
)
|
|
]
|
|
|
|
assistant_text = "\n".join(assistant_text_parts) if assistant_text_parts else None
|
|
|
|
# Build ChatCompletionResponse-compatible structure
|
|
# Imports for these Pydantic models are already present in this module
|
|
choice = Choice(
|
|
index=0,
|
|
finish_reason=finish_reason,
|
|
message=ChoiceMessage(
|
|
role="assistant",
|
|
content=assistant_text or "",
|
|
reasoning_content="\n".join(reasoning_summary_parts) if reasoning_summary_parts else None,
|
|
reasoning_content_signature=reasoning_content_signature if reasoning_summary_parts else None,
|
|
redacted_reasoning_content=None,
|
|
omitted_reasoning_content=False,
|
|
tool_calls=tool_calls,
|
|
),
|
|
)
|
|
|
|
chat_completion_response = ChatCompletionResponse(
|
|
id=response_data.get("id", ""),
|
|
choices=[choice],
|
|
created=int(response_data.get("created_at") or 0),
|
|
model=model or (llm_config.model if hasattr(llm_config, "model") else None),
|
|
usage=usage_stats,
|
|
)
|
|
|
|
return chat_completion_response
|
|
|
|
# OpenAI's response structure directly maps to ChatCompletionResponse
|
|
# We just need to instantiate the Pydantic model for validation and type safety.
|
|
chat_completion_response = ChatCompletionResponse(**response_data)
|
|
chat_completion_response = self._fix_truncated_json_response(chat_completion_response)
|
|
|
|
# Parse reasoning_content from vLLM/OpenRouter/OpenAI proxies that return this field
|
|
# This handles cases where the proxy returns .reasoning_content in the response
|
|
if (
|
|
chat_completion_response.choices
|
|
and len(chat_completion_response.choices) > 0
|
|
and chat_completion_response.choices[0].message
|
|
and not chat_completion_response.choices[0].message.reasoning_content
|
|
):
|
|
if "choices" in response_data and len(response_data["choices"]) > 0:
|
|
choice_data = response_data["choices"][0]
|
|
message_data = choice_data.get("message", {})
|
|
# Check for reasoning_content (standard) or reasoning (OpenRouter)
|
|
reasoning_content = message_data.get("reasoning_content") or message_data.get("reasoning")
|
|
if reasoning_content:
|
|
chat_completion_response.choices[0].message.reasoning_content = reasoning_content
|
|
chat_completion_response.choices[0].message.reasoning_content_signature = None
|
|
|
|
# Unpack inner thoughts if they were embedded in function arguments
|
|
if llm_config.put_inner_thoughts_in_kwargs:
|
|
chat_completion_response = unpack_all_inner_thoughts_from_kwargs(
|
|
response=chat_completion_response, inner_thoughts_key=INNER_THOUGHTS_KWARG
|
|
)
|
|
|
|
# If we used a reasoning model, create a content part for the ommitted reasoning
|
|
if self.is_reasoning_model(llm_config):
|
|
chat_completion_response.choices[0].message.omitted_reasoning_content = True
|
|
|
|
return chat_completion_response
|
|
|
|
@trace_method
|
|
async def stream_async(self, request_data: dict, llm_config: LLMConfig) -> AsyncStream[ChatCompletionChunk | ResponseStreamEvent]:
|
|
"""
|
|
Performs underlying asynchronous streaming request to OpenAI and returns the async stream iterator.
|
|
"""
|
|
# Sanitize Unicode surrogates to prevent encoding errors
|
|
request_data = sanitize_unicode_surrogates(request_data)
|
|
|
|
kwargs = await self._prepare_client_kwargs_async(llm_config)
|
|
client = AsyncOpenAI(**kwargs)
|
|
|
|
# Route based on payload shape: Responses uses 'input', Chat Completions uses 'messages'
|
|
if "input" in request_data and "messages" not in request_data:
|
|
try:
|
|
response_stream: AsyncStream[ResponseStreamEvent] = await client.responses.create(
|
|
**request_data,
|
|
stream=True,
|
|
# stream_options={"include_usage": True},
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error streaming OpenAI Responses request: {e} with request data: {json.dumps(request_data)}")
|
|
raise e
|
|
else:
|
|
try:
|
|
response_stream: AsyncStream[ChatCompletionChunk] = await client.chat.completions.create(
|
|
**request_data,
|
|
stream=True,
|
|
stream_options={"include_usage": True},
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error streaming OpenAI Chat Completions request: {e} with request data: {json.dumps(request_data)}")
|
|
raise e
|
|
return response_stream
|
|
|
|
@trace_method
|
|
async def stream_async_responses(self, request_data: dict, llm_config: LLMConfig) -> AsyncStream[ResponseStreamEvent]:
|
|
"""
|
|
Performs underlying asynchronous streaming request to OpenAI and returns the async stream iterator.
|
|
"""
|
|
# Sanitize Unicode surrogates to prevent encoding errors
|
|
request_data = sanitize_unicode_surrogates(request_data)
|
|
|
|
kwargs = await self._prepare_client_kwargs_async(llm_config)
|
|
client = AsyncOpenAI(**kwargs)
|
|
response_stream: AsyncStream[ResponseStreamEvent] = await client.responses.create(**request_data, stream=True)
|
|
return response_stream
|
|
|
|
@trace_method
|
|
async def request_embeddings(self, inputs: List[str], embedding_config: EmbeddingConfig) -> List[List[float]]:
|
|
"""Request embeddings given texts and embedding config with chunking and retry logic
|
|
|
|
Retry strategy prioritizes reducing batch size before chunk size to maintain retrieval quality:
|
|
1. Start with batch_size=2048 (texts per request)
|
|
2. On failure, halve batch_size until it reaches 1
|
|
3. Only then start reducing chunk_size (for very large individual texts)
|
|
"""
|
|
if not inputs:
|
|
return []
|
|
|
|
request_start = time.time()
|
|
logger.info(f"DIAGNOSTIC: request_embeddings called with {len(inputs)} inputs, model={embedding_config.embedding_model}")
|
|
|
|
# Validate inputs - OpenAI rejects empty strings or non-string values
|
|
# See: https://community.openai.com/t/embedding-api-change-input-is-invalid/707490/7
|
|
valid_inputs = []
|
|
input_index_map = [] # Map valid input index back to original index
|
|
|
|
for idx, inp in enumerate(inputs):
|
|
if not isinstance(inp, str):
|
|
logger.error(f"Invalid input at index {idx}: type={type(inp)}, value={inp}")
|
|
raise ValueError(f"Input at index {idx} is not a string: {type(inp)}")
|
|
if not inp or not inp.strip():
|
|
logger.warning(f"Empty or whitespace-only input at index {idx}, replacing with placeholder")
|
|
# Replace empty strings with placeholder to avoid API rejection
|
|
valid_inputs.append(" ")
|
|
input_index_map.append(idx)
|
|
else:
|
|
valid_inputs.append(inp)
|
|
input_index_map.append(idx)
|
|
|
|
if not valid_inputs:
|
|
logger.error("All inputs are empty after validation")
|
|
raise ValueError("Cannot request embeddings for empty inputs")
|
|
|
|
# Use valid_inputs instead of inputs for processing
|
|
inputs = valid_inputs
|
|
|
|
kwargs = self._prepare_client_kwargs_embedding(embedding_config)
|
|
client = AsyncOpenAI(**kwargs)
|
|
|
|
# track results by original index to maintain order
|
|
results = [None] * len(inputs)
|
|
initial_batch_size = 2048
|
|
chunks_to_process = [(i, inputs[i : i + initial_batch_size], initial_batch_size) for i in range(0, len(inputs), initial_batch_size)]
|
|
min_chunk_size = 128
|
|
|
|
while chunks_to_process:
|
|
tasks = []
|
|
task_metadata = []
|
|
|
|
for start_idx, chunk_inputs, current_batch_size in chunks_to_process:
|
|
if not chunk_inputs:
|
|
logger.warning(f"Skipping empty chunk at start_idx={start_idx}")
|
|
continue
|
|
|
|
logger.info(
|
|
f"DIAGNOSTIC: Creating embedding task: start_idx={start_idx}, batch_size={len(chunk_inputs)}, "
|
|
f"first_input_len={len(chunk_inputs[0]) if chunk_inputs else 0}, "
|
|
f"model={embedding_config.embedding_model}"
|
|
)
|
|
task = client.embeddings.create(model=embedding_config.embedding_model, input=chunk_inputs)
|
|
tasks.append(task)
|
|
task_metadata.append((start_idx, chunk_inputs, current_batch_size))
|
|
|
|
if not tasks:
|
|
logger.warning("All chunks were empty, skipping embedding request")
|
|
break
|
|
|
|
gather_start = time.time()
|
|
logger.info(f"DIAGNOSTIC: Awaiting {len(tasks)} embedding API calls...")
|
|
task_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
gather_duration = time.time() - gather_start
|
|
|
|
if gather_duration > 1.0:
|
|
logger.warning(f"DIAGNOSTIC: SLOW embedding API gather took {gather_duration:.2f}s for {len(tasks)} tasks")
|
|
else:
|
|
logger.info(f"DIAGNOSTIC: Embedding API gather completed in {gather_duration:.2f}s")
|
|
|
|
failed_chunks = []
|
|
for (start_idx, chunk_inputs, current_batch_size), result in zip(task_metadata, task_results):
|
|
if isinstance(result, Exception):
|
|
current_size = len(chunk_inputs)
|
|
|
|
if current_batch_size > 1:
|
|
new_batch_size = max(1, current_batch_size // 2)
|
|
logger.warning(
|
|
f"Embeddings request failed for batch starting at {start_idx} with size {current_size}. "
|
|
f"Reducing batch size from {current_batch_size} to {new_batch_size} and retrying."
|
|
)
|
|
mid = max(1, len(chunk_inputs) // 2)
|
|
if chunk_inputs[:mid]:
|
|
failed_chunks.append((start_idx, chunk_inputs[:mid], new_batch_size))
|
|
if chunk_inputs[mid:]:
|
|
failed_chunks.append((start_idx + mid, chunk_inputs[mid:], new_batch_size))
|
|
elif current_size > min_chunk_size:
|
|
logger.warning(
|
|
f"Embeddings request failed for single item at {start_idx} with size {current_size}. "
|
|
f"Splitting individual text content and retrying."
|
|
)
|
|
mid = max(1, len(chunk_inputs) // 2)
|
|
if chunk_inputs[:mid]:
|
|
failed_chunks.append((start_idx, chunk_inputs[:mid], 1))
|
|
if chunk_inputs[mid:]:
|
|
failed_chunks.append((start_idx + mid, chunk_inputs[mid:], 1))
|
|
else:
|
|
chunk_preview = str(chunk_inputs)[:500] if chunk_inputs else "None"
|
|
logger.error(
|
|
f"Failed to get embeddings for chunk starting at {start_idx} even with batch_size=1 "
|
|
f"and minimum chunk size {min_chunk_size}. Error: {result}. "
|
|
f"Chunk preview (first 500 chars): {chunk_preview}"
|
|
)
|
|
raise result
|
|
else:
|
|
embeddings = [r.embedding for r in result.data]
|
|
for i, embedding in enumerate(embeddings):
|
|
results[start_idx + i] = embedding
|
|
|
|
chunks_to_process = failed_chunks
|
|
|
|
total_duration = time.time() - request_start
|
|
if total_duration > 2.0:
|
|
logger.error(f"DIAGNOSTIC: BLOCKING DETECTED - request_embeddings took {total_duration:.2f}s for {len(inputs)} inputs")
|
|
elif total_duration > 1.0:
|
|
logger.warning(f"DIAGNOSTIC: Slow request_embeddings took {total_duration:.2f}s for {len(inputs)} inputs")
|
|
else:
|
|
logger.info(f"DIAGNOSTIC: request_embeddings completed in {total_duration:.2f}s")
|
|
|
|
return results
|
|
|
|
@trace_method
|
|
def handle_llm_error(self, e: Exception) -> Exception:
|
|
"""
|
|
Maps OpenAI-specific errors to common LLMError types.
|
|
"""
|
|
if isinstance(e, openai.APITimeoutError):
|
|
timeout_duration = getattr(e, "timeout", "unknown")
|
|
logger.warning(f"[OpenAI] Request timeout after {timeout_duration} seconds: {e}")
|
|
return LLMTimeoutError(
|
|
message=f"Request to OpenAI timed out: {str(e)}",
|
|
code=ErrorCode.TIMEOUT,
|
|
details={
|
|
"timeout_duration": timeout_duration,
|
|
"cause": str(e.__cause__) if e.__cause__ else None,
|
|
},
|
|
)
|
|
|
|
if isinstance(e, openai.APIConnectionError):
|
|
logger.warning(f"[OpenAI] API connection error: {e}")
|
|
return LLMConnectionError(
|
|
message=f"Failed to connect to OpenAI: {str(e)}",
|
|
code=ErrorCode.INTERNAL_SERVER_ERROR,
|
|
details={"cause": str(e.__cause__) if e.__cause__ else None},
|
|
)
|
|
|
|
# Handle httpx.RemoteProtocolError which can occur during streaming
|
|
# when the remote server closes the connection unexpectedly
|
|
# (e.g., "peer closed connection without sending complete message body")
|
|
if isinstance(e, httpx.RemoteProtocolError):
|
|
logger.warning(f"[OpenAI] Remote protocol error during streaming: {e}")
|
|
return LLMConnectionError(
|
|
message=f"Connection error during OpenAI streaming: {str(e)}",
|
|
code=ErrorCode.INTERNAL_SERVER_ERROR,
|
|
details={"cause": str(e.__cause__) if e.__cause__ else None},
|
|
)
|
|
|
|
# Handle httpx network errors which can occur during streaming
|
|
# when the connection is unexpectedly closed while reading/writing
|
|
if isinstance(e, (httpx.ReadError, httpx.WriteError, httpx.ConnectError)):
|
|
logger.warning(f"[OpenAI] Network error during streaming: {type(e).__name__}: {e}")
|
|
return LLMConnectionError(
|
|
message=f"Network error during OpenAI streaming: {str(e)}",
|
|
code=ErrorCode.INTERNAL_SERVER_ERROR,
|
|
details={"cause": str(e.__cause__) if e.__cause__ else None, "error_type": type(e).__name__},
|
|
)
|
|
|
|
if isinstance(e, openai.RateLimitError):
|
|
logger.warning(f"[OpenAI] Rate limited (429). Consider backoff. Error: {e}")
|
|
return LLMRateLimitError(
|
|
message=f"Rate limited by OpenAI: {str(e)}",
|
|
code=ErrorCode.RATE_LIMIT_EXCEEDED,
|
|
details=e.body, # Include body which often has rate limit details
|
|
)
|
|
|
|
if isinstance(e, openai.BadRequestError):
|
|
logger.warning(f"[OpenAI] Bad request (400): {str(e)}")
|
|
# BadRequestError can signify different issues (e.g., invalid args, context length)
|
|
# Check for context_length_exceeded error code in the error body
|
|
error_code = None
|
|
if e.body and isinstance(e.body, dict):
|
|
error_details = e.body.get("error", {})
|
|
if isinstance(error_details, dict):
|
|
error_code = error_details.get("code")
|
|
|
|
# Check both the error code and message content for context length issues
|
|
if error_code == "context_length_exceeded" or is_context_window_overflow_message(str(e)):
|
|
return ContextWindowExceededError(
|
|
message=f"Bad request to OpenAI (context window exceeded): {str(e)}",
|
|
)
|
|
else:
|
|
return LLMBadRequestError(
|
|
message=f"Bad request to OpenAI: {str(e)}",
|
|
code=ErrorCode.INVALID_ARGUMENT, # Or more specific if detectable
|
|
details=e.body,
|
|
)
|
|
|
|
# NOTE: The OpenAI Python SDK may raise a generic `openai.APIError` while *iterating*
|
|
# over a stream (e.g. Responses API streaming). In this case we don't necessarily
|
|
# get a `BadRequestError` with a structured error body, but we still want to
|
|
# trigger Letta's context window compaction / retry logic.
|
|
#
|
|
# Example message:
|
|
# "Your input exceeds the context window of this model. Please adjust your input and try again."
|
|
if isinstance(e, openai.APIError):
|
|
msg = str(e)
|
|
if is_context_window_overflow_message(msg):
|
|
return ContextWindowExceededError(
|
|
message=f"OpenAI request exceeded the context window: {msg}",
|
|
details={
|
|
"provider_exception_type": type(e).__name__,
|
|
# Best-effort extraction (may not exist on APIError)
|
|
"body": getattr(e, "body", None),
|
|
},
|
|
)
|
|
|
|
if isinstance(e, openai.AuthenticationError):
|
|
logger.error(f"[OpenAI] Authentication error (401): {str(e)}") # More severe log level
|
|
return LLMAuthenticationError(
|
|
message=f"Authentication failed with OpenAI: {str(e)}", code=ErrorCode.UNAUTHENTICATED, details=e.body
|
|
)
|
|
|
|
if isinstance(e, openai.PermissionDeniedError):
|
|
logger.error(f"[OpenAI] Permission denied (403): {str(e)}") # More severe log level
|
|
return LLMPermissionDeniedError(
|
|
message=f"Permission denied by OpenAI: {str(e)}", code=ErrorCode.PERMISSION_DENIED, details=e.body
|
|
)
|
|
|
|
if isinstance(e, openai.NotFoundError):
|
|
logger.warning(f"[OpenAI] Resource not found (404): {str(e)}")
|
|
# Could be invalid model name, etc.
|
|
return LLMNotFoundError(message=f"Resource not found in OpenAI: {str(e)}", code=ErrorCode.NOT_FOUND, details=e.body)
|
|
|
|
if isinstance(e, openai.UnprocessableEntityError):
|
|
logger.warning(f"[OpenAI] Unprocessable entity (422): {str(e)}")
|
|
return LLMUnprocessableEntityError(
|
|
message=f"Invalid request content for OpenAI: {str(e)}",
|
|
code=ErrorCode.INVALID_ARGUMENT, # Usually validation errors
|
|
details=e.body,
|
|
)
|
|
|
|
# General API error catch-all
|
|
if isinstance(e, openai.APIStatusError):
|
|
logger.warning(f"[OpenAI] API status error ({e.status_code}): {str(e)}")
|
|
# Handle 413 Request Entity Too Large - request payload exceeds size limits
|
|
if e.status_code == 413:
|
|
return ContextWindowExceededError(
|
|
message=f"Request too large for OpenAI (413): {str(e)}",
|
|
)
|
|
# Map based on status code potentially
|
|
if e.status_code >= 500:
|
|
error_cls = LLMServerError
|
|
error_code = ErrorCode.INTERNAL_SERVER_ERROR
|
|
else:
|
|
# Treat other 4xx as bad requests if not caught above
|
|
error_cls = LLMBadRequestError
|
|
error_code = ErrorCode.INVALID_ARGUMENT
|
|
|
|
return error_cls(
|
|
message=f"OpenAI API error: {str(e)}",
|
|
code=error_code,
|
|
details={
|
|
"status_code": e.status_code,
|
|
"response": str(e.response),
|
|
"body": e.body,
|
|
},
|
|
)
|
|
|
|
# Fallback for unexpected errors
|
|
return super().handle_llm_error(e)
|
|
|
|
|
|
def fill_image_content_in_messages(openai_message_list: List[dict], pydantic_message_list: List[PydanticMessage]) -> List[dict]:
|
|
"""
|
|
Converts image content to openai format.
|
|
"""
|
|
|
|
if len(openai_message_list) != len(pydantic_message_list):
|
|
return openai_message_list
|
|
|
|
new_message_list = []
|
|
for idx in range(len(openai_message_list)):
|
|
openai_message, pydantic_message = openai_message_list[idx], pydantic_message_list[idx]
|
|
if pydantic_message.role != "user":
|
|
new_message_list.append(openai_message)
|
|
continue
|
|
|
|
if not isinstance(pydantic_message.content, list) or (
|
|
len(pydantic_message.content) == 1 and pydantic_message.content[0].type == MessageContentType.text
|
|
):
|
|
new_message_list.append(openai_message)
|
|
continue
|
|
|
|
message_content = []
|
|
for content in pydantic_message.content:
|
|
if content.type == MessageContentType.text:
|
|
message_content.append(
|
|
{
|
|
"type": "text",
|
|
"text": content.text,
|
|
}
|
|
)
|
|
elif content.type == MessageContentType.image:
|
|
message_content.append(
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{content.source.media_type};base64,{content.source.data}",
|
|
"detail": content.source.detail or "auto",
|
|
},
|
|
}
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported content type {content.type}")
|
|
|
|
new_message_list.append({"role": "user", "content": message_content})
|
|
|
|
return new_message_list
|
|
|
|
|
|
def fill_image_content_in_responses_input(openai_message_list: List[dict], pydantic_message_list: List[PydanticMessage]) -> List[dict]:
|
|
"""
|
|
Rewrite user messages in the Responses API input to embed multi-modal parts inside
|
|
the message's content array (not as top-level items).
|
|
|
|
Expected structure for Responses API input messages:
|
|
{ "type": "message", "role": "user", "content": [
|
|
{"type": "input_text", "text": "..."},
|
|
{"type": "input_image", "image_url": {"url": "data:<mime>;base64,<data>", "detail": "auto"}}
|
|
] }
|
|
|
|
Non-user items are left unchanged.
|
|
"""
|
|
user_msgs = [m for m in pydantic_message_list if getattr(m, "role", None) == "user"]
|
|
user_idx = 0
|
|
|
|
rewritten: List[dict] = []
|
|
for item in openai_message_list:
|
|
if isinstance(item, dict) and item.get("role") == "user":
|
|
if user_idx >= len(user_msgs):
|
|
rewritten.append(item)
|
|
continue
|
|
|
|
pm = user_msgs[user_idx]
|
|
user_idx += 1
|
|
|
|
existing_content = item.get("content")
|
|
if _is_responses_style_content(existing_content):
|
|
rewritten.append(item)
|
|
continue
|
|
|
|
# Only rewrite if the pydantic message actually contains multiple parts or images
|
|
if not isinstance(pm.content, list) or (len(pm.content) == 1 and pm.content[0].type == MessageContentType.text):
|
|
rewritten.append(item)
|
|
continue
|
|
|
|
parts: List[dict] = []
|
|
for content in pm.content:
|
|
if content.type == MessageContentType.text:
|
|
parts.append({"type": "input_text", "text": content.text})
|
|
elif content.type == MessageContentType.image:
|
|
# For Responses API, image_url is a string and detail is required
|
|
data_url = f"data:{content.source.media_type};base64,{content.source.data}"
|
|
parts.append(
|
|
{"type": "input_image", "image_url": data_url, "detail": getattr(content.source, "detail", None) or "auto"}
|
|
)
|
|
else:
|
|
# Skip unsupported content types for Responses input
|
|
continue
|
|
|
|
# Update message content to include multi-modal parts (EasyInputMessageParam style)
|
|
new_item = dict(item)
|
|
new_item["content"] = parts
|
|
rewritten.append(new_item)
|
|
else:
|
|
rewritten.append(item)
|
|
|
|
return rewritten
|
|
|
|
|
|
def _is_responses_style_content(content: Optional[Any]) -> bool:
|
|
if not isinstance(content, list):
|
|
return False
|
|
|
|
allowed_types = {"input_text", "input_image"}
|
|
for part in content:
|
|
if not isinstance(part, dict):
|
|
return False
|
|
part_type = part.get("type")
|
|
if part_type not in allowed_types:
|
|
return False
|
|
return True
|