From 4bb116f17ccca1d86d73ab127802049e3ea1c704 Mon Sep 17 00:00:00 2001 From: cthomas Date: Thu, 20 Nov 2025 11:37:23 -0800 Subject: [PATCH] fix: sync api call in message path (#6291) * fix: sync api call in message path * remove unused function * add new error type --- letta/agents/ephemeral_summary_agent.py | 2 +- letta/agents/helpers.py | 50 +++---------------------- letta/agents/voice_agent.py | 4 +- letta/errors.py | 12 ++++++ letta/helpers/message_helper.py | 35 +++++++++++++---- letta/server/rest_api/utils.py | 6 ++- letta/services/summarizer/summarizer.py | 28 +++++++------- 7 files changed, 66 insertions(+), 71 deletions(-) diff --git a/letta/agents/ephemeral_summary_agent.py b/letta/agents/ephemeral_summary_agent.py index b73c3f26..1a5d7a77 100644 --- a/letta/agents/ephemeral_summary_agent.py +++ b/letta/agents/ephemeral_summary_agent.py @@ -78,7 +78,7 @@ class EphemeralSummaryAgent(BaseAgent): role=MessageRole.system, content=[TextContent(text=get_system_text("summary_system_prompt"))], ) - messages = convert_message_creates_to_messages( + messages = await convert_message_creates_to_messages( message_creates=[system_message_create] + input_messages, agent_id=self.agent_id, timezone=agent_state.timezone, diff --git a/letta/agents/helpers.py b/letta/agents/helpers.py index c5050b25..3e5aed0a 100644 --- a/letta/agents/helpers.py +++ b/letta/agents/helpers.py @@ -51,47 +51,6 @@ def _create_letta_response( return LettaResponse(messages=response_messages, stop_reason=stop_reason, usage=usage) -def _prepare_in_context_messages( - input_messages: List[MessageCreate], - agent_state: AgentState, - message_manager: MessageManager, - actor: User, - run_id: str, -) -> Tuple[List[Message], List[Message]]: - """ - Prepares in-context messages for an agent, based on the current state and a new user input. - - Args: - input_messages (List[MessageCreate]): The new user input messages to process. - agent_state (AgentState): The current state of the agent, including message buffer config. - message_manager (MessageManager): The manager used to retrieve and create messages. - actor (User): The user performing the action, used for access control and attribution. - run_id (str): The run ID associated with this message processing. - - Returns: - Tuple[List[Message], List[Message]]: A tuple containing: - - The current in-context messages (existing context for the agent). - - The new in-context messages (messages created from the new input). - """ - - if agent_state.message_buffer_autoclear: - # If autoclear is enabled, only include the most recent system message (usually at index 0) - current_in_context_messages = [message_manager.get_messages_by_ids(message_ids=agent_state.message_ids, actor=actor)[0]] - else: - # Otherwise, include the full list of messages by ID for context - current_in_context_messages = message_manager.get_messages_by_ids(message_ids=agent_state.message_ids, actor=actor) - - # Create a new user message from the input and store it - new_in_context_messages = message_manager.create_many_messages( - create_input_messages( - input_messages=input_messages, agent_id=agent_state.id, timezone=agent_state.timezone, run_id=run_id, actor=actor - ), - actor=actor, - ) - - return current_in_context_messages, new_in_context_messages - - async def _prepare_in_context_messages_async( input_messages: List[MessageCreate], agent_state: AgentState, @@ -124,10 +83,11 @@ async def _prepare_in_context_messages_async( current_in_context_messages = await message_manager.get_messages_by_ids_async(message_ids=agent_state.message_ids, actor=actor) # Create a new user message from the input and store it + input_msgs = await create_input_messages( + input_messages=input_messages, agent_id=agent_state.id, timezone=agent_state.timezone, run_id=run_id, actor=actor + ) new_in_context_messages = await message_manager.create_many_messages_async( - create_input_messages( - input_messages=input_messages, agent_id=agent_state.id, timezone=agent_state.timezone, run_id=run_id, actor=actor - ), + input_msgs, actor=actor, project_id=agent_state.project_id, ) @@ -201,7 +161,7 @@ async def _prepare_in_context_messages_no_persist_async( raise PendingApprovalError(pending_request_id=current_in_context_messages[-1].id) # Create a new user message from the input but dont store it yet - new_in_context_messages = create_input_messages( + new_in_context_messages = await create_input_messages( input_messages=input_messages, agent_id=agent_state.id, timezone=agent_state.timezone, run_id=run_id, actor=actor ) diff --git a/letta/agents/voice_agent.py b/letta/agents/voice_agent.py index 3002bc61..a173ff47 100644 --- a/letta/agents/voice_agent.py +++ b/letta/agents/voice_agent.py @@ -155,8 +155,8 @@ class VoiceAgent(BaseAgent): max_files_open=agent_state.max_files_open, llm_config=agent_state.llm_config, ) - letta_message_db_queue = create_input_messages( - input_messages=input_messages, agent_id=agent_state.id, timezone=agent_state.timezone, actor=self.actor + letta_message_db_queue = await create_input_messages( + input_messages=input_messages, agent_id=agent_state.id, timezone=agent_state.timezone, run_id=None, actor=self.actor ) in_memory_message_history = self.pre_process_input_message(input_messages) diff --git a/letta/errors.py b/letta/errors.py index e2c1074b..e99d1f5f 100644 --- a/letta/errors.py +++ b/letta/errors.py @@ -110,6 +110,18 @@ class LettaInvalidArgumentError(LettaError): super().__init__(message=message, code=ErrorCode.INVALID_ARGUMENT, details=details) +class LettaImageFetchError(LettaError): + """Error raised when fetching an image from a URL fails.""" + + def __init__(self, url: str, reason: str): + details = {"url": url, "reason": reason} + super().__init__( + message=f"Failed to fetch image from {url}: {reason}", + code=ErrorCode.INVALID_ARGUMENT, + details=details, + ) + + class LettaMCPError(LettaError): """Base error for MCP-related issues.""" diff --git a/letta/helpers/message_helper.py b/letta/helpers/message_helper.py index 6d6bdeaf..9ff05cff 100644 --- a/letta/helpers/message_helper.py +++ b/letta/helpers/message_helper.py @@ -1,3 +1,4 @@ +import asyncio import base64 import mimetypes from urllib.parse import unquote, urlparse @@ -5,12 +6,31 @@ from urllib.parse import unquote, urlparse import httpx from letta import system +from letta.errors import LettaImageFetchError from letta.schemas.enums import MessageRole from letta.schemas.letta_message_content import Base64Image, ImageContent, ImageSourceType, TextContent from letta.schemas.message import Message, MessageCreate -def convert_message_creates_to_messages( +async def _fetch_image_from_url(url: str) -> tuple[bytes, str | None]: + """ + Async helper to fetch image from URL without blocking the event loop. + """ + timeout = httpx.Timeout(15.0, connect=5.0) + try: + async with httpx.AsyncClient(timeout=timeout) as client: + image_response = await client.get(url, follow_redirects=True) + image_response.raise_for_status() + image_bytes = image_response.content + image_media_type = image_response.headers.get("content-type") + return image_bytes, image_media_type + except (httpx.RemoteProtocolError, httpx.TimeoutException, httpx.HTTPStatusError) as e: + raise LettaImageFetchError(url=url, reason=str(e)) + except Exception as e: + raise LettaImageFetchError(url=url, reason=f"Unexpected error: {e}") + + +async def convert_message_creates_to_messages( message_creates: list[MessageCreate], agent_id: str, timezone: str, @@ -18,7 +38,8 @@ def convert_message_creates_to_messages( wrap_user_message: bool = True, wrap_system_message: bool = True, ) -> list[Message]: - return [ + # Process all messages concurrently + tasks = [ _convert_message_create_to_message( message_create=create, agent_id=agent_id, @@ -29,9 +50,10 @@ def convert_message_creates_to_messages( ) for create in message_creates ] + return await asyncio.gather(*tasks) -def _convert_message_create_to_message( +async def _convert_message_create_to_message( message_create: MessageCreate, agent_id: str, timezone: str, @@ -85,11 +107,8 @@ def _convert_message_create_to_message( if not image_media_type: image_media_type = "image/jpeg" # default fallback else: - # Handle http(s):// URLs using httpx - image_response = httpx.get(url) - image_response.raise_for_status() - image_bytes = image_response.content - image_media_type = image_response.headers.get("content-type") + # Handle http(s):// URLs using async httpx + image_bytes, image_media_type = await _fetch_image_from_url(url) if not image_media_type: image_media_type, _ = mimetypes.guess_type(url) diff --git a/letta/server/rest_api/utils.py b/letta/server/rest_api/utils.py index aaa27212..de5134f1 100644 --- a/letta/server/rest_api/utils.py +++ b/letta/server/rest_api/utils.py @@ -155,7 +155,9 @@ def capture_sentry_exception(e: BaseException): sentry_sdk.capture_exception(e) -def create_input_messages(input_messages: List[MessageCreate], agent_id: str, timezone: str, run_id: str, actor: User) -> List[Message]: +async def create_input_messages( + input_messages: List[MessageCreate], agent_id: str, timezone: str, run_id: str, actor: User +) -> List[Message]: """ Converts a user input message into the internal structured format. @@ -163,7 +165,7 @@ def create_input_messages(input_messages: List[MessageCreate], agent_id: str, ti we should unify this when it's clear what message attributes we need. """ - messages = convert_message_creates_to_messages( + messages = await convert_message_creates_to_messages( input_messages, agent_id, timezone, run_id, wrap_user_message=False, wrap_system_message=False ) return messages diff --git a/letta/services/summarizer/summarizer.py b/letta/services/summarizer/summarizer.py index dc739855..055f9f96 100644 --- a/letta/services/summarizer/summarizer.py +++ b/letta/services/summarizer/summarizer.py @@ -183,19 +183,21 @@ class Summarizer: summary=summary_message_str, timezone=agent_state.timezone, ) - summary_message_obj = convert_message_creates_to_messages( - message_creates=[ - MessageCreate( - role=MessageRole.user, - content=[TextContent(text=summary_message_str_packed)], - ) - ], - agent_id=agent_state.id, - timezone=agent_state.timezone, - # We already packed, don't pack again - wrap_user_message=False, - wrap_system_message=False, - run_id=None, # TODO: add this + summary_message_obj = ( + await convert_message_creates_to_messages( + message_creates=[ + MessageCreate( + role=MessageRole.user, + content=[TextContent(text=summary_message_str_packed)], + ) + ], + agent_id=agent_state.id, + timezone=agent_state.timezone, + # We already packed, don't pack again + wrap_user_message=False, + wrap_system_message=False, + run_id=None, # TODO: add this + ) )[0] # Create the message in the DB