From 345ea42630b19461523cf950a95ed0085c16f004 Mon Sep 17 00:00:00 2001 From: cthomas Date: Thu, 20 Nov 2025 12:46:03 -0800 Subject: [PATCH] feat: offload all file i/o in server endpoints LET-6252 (#6300) feat: offload all file i/o in server endpoints --- letta/helpers/message_helper.py | 9 ++-- letta/server/rest_api/routers/v1/folders.py | 10 +++-- letta/server/rest_api/routers/v1/sources.py | 10 +++-- letta/server/rest_api/routers/v1/tools.py | 7 ++-- letta/server/server.py | 42 ++++++++++++------- letta/services/mcp_manager.py | 23 ++++++---- .../services/tool_sandbox/modal_sandbox_v2.py | 9 +++- 7 files changed, 71 insertions(+), 39 deletions(-) diff --git a/letta/helpers/message_helper.py b/letta/helpers/message_helper.py index 9ff05cff..95f8001b 100644 --- a/letta/helpers/message_helper.py +++ b/letta/helpers/message_helper.py @@ -98,9 +98,12 @@ async def _convert_message_create_to_message( parsed = urlparse(url) file_path = unquote(parsed.path) - # Read file directly from filesystem - with open(file_path, "rb") as f: - image_bytes = f.read() + # Read file directly from filesystem (wrapped to avoid blocking event loop) + def _read_file(): + with open(file_path, "rb") as f: + return f.read() + + image_bytes = await asyncio.to_thread(_read_file) # Guess media type from file extension image_media_type, _ = mimetypes.guess_type(file_path) diff --git a/letta/server/rest_api/routers/v1/folders.py b/letta/server/rest_api/routers/v1/folders.py index 1ebd09e3..9af648f2 100644 --- a/letta/server/rest_api/routers/v1/folders.py +++ b/letta/server/rest_api/routers/v1/folders.py @@ -1,3 +1,4 @@ +import asyncio import mimetypes import os import tempfile @@ -556,9 +557,12 @@ async def load_file_to_source_async(server: SyncServer, source_id: str, job_id: with tempfile.TemporaryDirectory() as tmpdirname: file_path = os.path.join(tmpdirname, filename) - # Write the file to the sanitized path - with open(file_path, "wb") as buffer: - buffer.write(bytes) + # Write the file to the sanitized path (wrapped to avoid blocking event loop) + def _write_file(): + with open(file_path, "wb") as buffer: + buffer.write(bytes) + + await asyncio.to_thread(_write_file) # Pass the file to load_file_to_source await server.load_file_to_source(source_id, file_path, job_id, actor) diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index e9d3b9d8..1f1586dc 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -1,3 +1,4 @@ +import asyncio import mimetypes import os import tempfile @@ -471,9 +472,12 @@ async def load_file_to_source_async(server: SyncServer, source_id: str, job_id: with tempfile.TemporaryDirectory() as tmpdirname: file_path = os.path.join(tmpdirname, filename) - # Write the file to the sanitized path - with open(file_path, "wb") as buffer: - buffer.write(bytes) + # Write the file to the sanitized path (wrapped to avoid blocking event loop) + def _write_file(): + with open(file_path, "wb") as buffer: + buffer.write(bytes) + + await asyncio.to_thread(_write_file) # Pass the file to load_file_to_source await server.load_file_to_source(source_id, file_path, job_id, actor) diff --git a/letta/server/rest_api/routers/v1/tools.py b/letta/server/rest_api/routers/v1/tools.py index c6cf0d87..16e8eab5 100644 --- a/letta/server/rest_api/routers/v1/tools.py +++ b/letta/server/rest_api/routers/v1/tools.py @@ -7,8 +7,7 @@ from httpx import ConnectError, HTTPStatusError from pydantic import BaseModel, Field from starlette.responses import StreamingResponse -from letta.constants import MAX_TOOL_NAME_LENGTH -from letta.constants import DEFAULT_GENERATE_TOOL_MODEL_HANDLE +from letta.constants import DEFAULT_GENERATE_TOOL_MODEL_HANDLE, MAX_TOOL_NAME_LENGTH from letta.errors import ( LettaInvalidArgumentError, LettaInvalidMCPSchemaError, @@ -365,7 +364,7 @@ async def list_mcp_servers( Get a list of all configured MCP servers """ if tool_settings.mcp_read_from_config: - return server.get_mcp_servers() + return await server.get_mcp_servers() else: actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) mcp_servers = await server.mcp_manager.list_mcp_servers(actor=actor) @@ -541,7 +540,7 @@ async def delete_mcp_server_from_config( """ if tool_settings.mcp_read_from_config: # write to config file - return server.delete_mcp_server_from_config(server_name=mcp_server_name) + return await server.delete_mcp_server_from_config(server_name=mcp_server_name) else: # log to DB actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) diff --git a/letta/server/server.py b/letta/server/server.py index f7577e68..07348978 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -1357,7 +1357,7 @@ class SyncServer(object): # MCP wrappers # TODO support both command + SSE servers (via config) - def get_mcp_servers(self) -> dict[str, Union[SSEServerConfig, StdioServerConfig]]: + async def get_mcp_servers(self) -> dict[str, Union[SSEServerConfig, StdioServerConfig]]: """List the MCP servers in the config (doesn't test that they are actually working)""" # TODO implement non-flatfile mechanism @@ -1370,12 +1370,16 @@ class SyncServer(object): # Attempt to read from ~/.letta/mcp_config.json mcp_config_path = os.path.join(constants.LETTA_DIR, constants.MCP_CONFIG_NAME) if os.path.exists(mcp_config_path): - with open(mcp_config_path, "r") as f: - try: - mcp_config = json.load(f) - except Exception as e: - logger.error(f"Failed to parse MCP config file ({mcp_config_path}) as json: {e}") - return mcp_server_list + + def _read_config(): + with open(mcp_config_path, "r") as f: + return json.load(f) + + try: + mcp_config = await asyncio.to_thread(_read_config) + except Exception as e: + logger.error(f"Failed to parse MCP config file ({mcp_config_path}) as json: {e}") + return mcp_server_list # Proper formatting is "mcpServers" key at the top level, # then a dict with the MCP server name as the key, @@ -1482,18 +1486,22 @@ class SyncServer(object): # Add to the server file current_mcp_servers[server_config.server_name] = server_config - # Write out the file, and make sure to in include the top-level mcpConfig + # Write out the file, and make sure to in include the top-level mcpConfig (wrapped to avoid blocking event loop) try: new_mcp_file = {MCP_CONFIG_TOPLEVEL_KEY: {k: v.to_dict() for k, v in current_mcp_servers.items()}} - with open(mcp_config_path, "w") as f: - json.dump(new_mcp_file, f, indent=4) + + def _write_config(): + with open(mcp_config_path, "w") as f: + json.dump(new_mcp_file, f, indent=4) + + await asyncio.to_thread(_write_config) except Exception as e: logger.error(f"Failed to write MCP config file at {mcp_config_path}: {e}") raise LettaInvalidArgumentError(f"Failed to write MCP config file {mcp_config_path}") return list(current_mcp_servers.values()) - def delete_mcp_server_from_config(self, server_name: str) -> dict[str, Union[SSEServerConfig, StdioServerConfig]]: + async def delete_mcp_server_from_config(self, server_name: str) -> dict[str, Union[SSEServerConfig, StdioServerConfig]]: """Delete a server config from the MCP config file""" # TODO implement non-flatfile mechanism @@ -1508,7 +1516,7 @@ class SyncServer(object): # If the file does exist, attempt to parse it get calling get_mcp_servers try: - current_mcp_servers = self.get_mcp_servers() + current_mcp_servers = await self.get_mcp_servers() except Exception as e: # Raise an error telling the user to fix the config file logger.error(f"Failed to parse MCP config file at {mcp_config_path}: {e}") @@ -1522,11 +1530,15 @@ class SyncServer(object): # Remove from the server file del current_mcp_servers[server_name] - # Write out the file, and make sure to in include the top-level mcpConfig + # Write out the file, and make sure to in include the top-level mcpConfig (wrapped to avoid blocking event loop) try: new_mcp_file = {MCP_CONFIG_TOPLEVEL_KEY: {k: v.to_dict() for k, v in current_mcp_servers.items()}} - with open(mcp_config_path, "w") as f: - json.dump(new_mcp_file, f, indent=4) + + def _write_config(): + with open(mcp_config_path, "w") as f: + json.dump(new_mcp_file, f, indent=4) + + await asyncio.to_thread(_write_config) except Exception as e: logger.error(f"Failed to write MCP config file at {mcp_config_path}: {e}") raise LettaInvalidArgumentError(f"Failed to write MCP config file {mcp_config_path}") diff --git a/letta/services/mcp_manager.py b/letta/services/mcp_manager.py index 97353c70..8fa443c6 100644 --- a/letta/services/mcp_manager.py +++ b/letta/services/mcp_manager.py @@ -1,3 +1,4 @@ +import asyncio import json import os import secrets @@ -118,7 +119,7 @@ class MCPManager: server_config = mcp_config.to_config(environment_variables) else: # read from config file - mcp_config = self.read_mcp_config() + mcp_config = await self.read_mcp_config() if mcp_server_name not in mcp_config: raise ValueError(f"MCP server {mcp_server_name} not found in config.") server_config = mcp_config[mcp_server_name] @@ -719,19 +720,23 @@ class MCPManager: logger.error(f"Failed to delete MCP server {mcp_server_id}: {e}") raise - def read_mcp_config(self) -> dict[str, Union[SSEServerConfig, StdioServerConfig, StreamableHTTPServerConfig]]: + async def read_mcp_config(self) -> dict[str, Union[SSEServerConfig, StdioServerConfig, StreamableHTTPServerConfig]]: mcp_server_list = {} # Attempt to read from ~/.letta/mcp_config.json mcp_config_path = os.path.join(constants.LETTA_DIR, constants.MCP_CONFIG_NAME) if os.path.exists(mcp_config_path): - with open(mcp_config_path, "r") as f: - try: - mcp_config = json.load(f) - except Exception as e: - # Config parsing errors are user configuration issues, not system errors - logger.warning(f"Failed to parse MCP config file ({mcp_config_path}) as json: {e}") - return mcp_server_list + # Read file without blocking event loop + def _read_config(): + with open(mcp_config_path, "r") as f: + return json.load(f) + + try: + mcp_config = await asyncio.to_thread(_read_config) + except Exception as e: + # Config parsing errors are user configuration issues, not system errors + logger.warning(f"Failed to parse MCP config file ({mcp_config_path}) as json: {e}") + return mcp_server_list # Proper formatting is "mcpServers" key at the top level, # then a dict with the MCP server name as the key, diff --git a/letta/services/tool_sandbox/modal_sandbox_v2.py b/letta/services/tool_sandbox/modal_sandbox_v2.py index c059cc8b..aca3f345 100644 --- a/letta/services/tool_sandbox/modal_sandbox_v2.py +++ b/letta/services/tool_sandbox/modal_sandbox_v2.py @@ -5,6 +5,7 @@ This runs tool calls within an isolated modal sandbox. This does this by doing t 3. tracking deployment versions to know when a deployment update is needed """ +import asyncio from typing import Any, Dict import modal @@ -106,8 +107,12 @@ class AsyncToolSandboxModalV2(AsyncToolSandboxBase): if not executor_path.exists(): raise ValueError(f"modal_executor.py not found at {executor_path}") - with open(executor_path, "r") as f: - f.read() + # Validate file is readable (wrapped to avoid blocking event loop) + def _validate_file(): + with open(executor_path, "r") as f: + f.read() + + await asyncio.to_thread(_validate_file) # Create a single file mount instead of directory mount # This avoids sys.path manipulation