Files
letta-server/letta/services/tool_sandbox/modal_sandbox_v2.py
Kian Jones f5c4ab50f4 chore: add ty + pre-commit hook and repeal even more ruff rules (#9504)
* auto fixes

* auto fix pt2 and transitive deps and undefined var checking locals()

* manual fixes (ignored or letta-code fixed)

* fix circular import

* remove all ignores, add FastAPI rules and Ruff rules

* add ty and precommit

* ruff stuff

* ty check fixes

* ty check fixes pt 2

* error on invalid
2026-02-24 10:55:11 -08:00

451 lines
19 KiB
Python

"""
This runs tool calls within an isolated modal sandbox. This does this by doing the following:
1. deploying modal functions that embed the original functions
2. dynamically executing tools with arguments passed in at runtime
3. tracking deployment versions to know when a deployment update is needed
"""
import asyncio
from typing import Any, Dict
import modal
from letta.log import get_logger
from letta.otel.tracing import log_event, trace_method
from letta.schemas.agent import AgentState
from letta.schemas.enums import SandboxType
from letta.schemas.sandbox_config import SandboxConfig
from letta.schemas.tool import Tool
from letta.schemas.tool_execution_result import ToolExecutionResult
from letta.services.tool_sandbox.base import AsyncToolSandboxBase
from letta.services.tool_sandbox.modal_constants import DEFAULT_MAX_CONCURRENT_INPUTS, DEFAULT_PYTHON_VERSION
from letta.services.tool_sandbox.modal_deployment_manager import ModalDeploymentManager
from letta.services.tool_sandbox.modal_version_manager import ModalVersionManager
from letta.services.tool_sandbox.safe_pickle import SafePickleError, safe_pickle_dumps, sanitize_for_pickle
from letta.settings import tool_settings
from letta.types import JsonDict
from letta.utils import get_friendly_error_msg
logger = get_logger(__name__)
class AsyncToolSandboxModalV2(AsyncToolSandboxBase):
"""Modal sandbox with dynamic argument passing and version tracking."""
def __init__(
self,
tool_name: str,
args: JsonDict,
user,
tool_id: str,
agent_id: str | None = None,
project_id: str | None = None,
tool_object: Tool | None = None,
sandbox_config: SandboxConfig | None = None,
sandbox_env_vars: dict[str, Any] | None = None,
version_manager: ModalVersionManager | None = None,
use_locking: bool = True,
use_version_tracking: bool = True,
):
"""
Initialize the Modal sandbox.
Args:
tool_name: Name of the tool to execute
args: Arguments to pass to the tool
user: User/actor for permissions
tool_id: Tool ID for the tool being executed
agent_id: Agent ID (optional)
project_id: Project ID for the tool execution (optional)
tool_object: Tool object (optional)
sandbox_config: Sandbox configuration (optional)
sandbox_env_vars: Environment variables (optional)
version_manager: Version manager, will create default if needed (optional)
use_locking: Whether to use locking for deployment coordination (default: True)
use_version_tracking: Whether to track and reuse deployments (default: True)
"""
super().__init__(
tool_name,
args,
user,
tool_id=tool_id,
agent_id=agent_id,
project_id=project_id,
tool_object=tool_object,
sandbox_config=sandbox_config,
sandbox_env_vars=sandbox_env_vars,
)
if not tool_settings.modal_token_id or not tool_settings.modal_token_secret:
raise ValueError("MODAL_TOKEN_ID and MODAL_TOKEN_SECRET must be set.")
# Initialize deployment manager with configurable options
self._deployment_manager = ModalDeploymentManager(
tool=self.tool,
version_manager=version_manager,
use_locking=use_locking,
use_version_tracking=use_version_tracking,
)
self._version_hash = None
async def _get_or_deploy_modal_app(self, sbx_config: SandboxConfig) -> modal.App:
"""Get existing Modal app or deploy a new version if needed."""
app, version_hash = await self._deployment_manager.get_or_deploy_app(
sbx_config=sbx_config,
user=self.user,
create_app_func=self._create_and_deploy_app,
)
self._version_hash = version_hash
return app
async def _create_and_deploy_app(self, sbx_config: SandboxConfig, version: str) -> modal.App:
"""Create and deploy a new Modal app with the executor function."""
import importlib.util
from pathlib import Path
# App name = tool_id + version hash
app_full_name = self._deployment_manager.get_full_app_name(version)
app = modal.App(app_full_name)
modal_config = sbx_config.get_modal_config()
image = self._get_modal_image(sbx_config)
# Find the sandbox module dynamically
spec = importlib.util.find_spec("sandbox")
if not spec or not spec.origin:
raise ValueError("Could not find sandbox module")
sandbox_dir = Path(spec.origin).parent
# Read the modal_executor module content
executor_path = sandbox_dir / "modal_executor.py"
if not executor_path.exists():
raise ValueError(f"modal_executor.py not found at {executor_path}")
# Validate file is readable (wrapped to avoid blocking event loop)
def _validate_file():
with open(executor_path, "r") as f:
f.read()
await asyncio.to_thread(_validate_file)
# Create a single file mount instead of directory mount
# This avoids sys.path manipulation
image = image.add_local_file(str(executor_path), remote_path="/modal_executor.py")
# Register the executor function with Modal
@app.function(
image=image,
timeout=modal_config.timeout,
restrict_modal_access=True,
max_inputs=DEFAULT_MAX_CONCURRENT_INPUTS,
serialized=True,
)
def tool_executor(
tool_source: str,
tool_name: str,
args_pickled: bytes,
agent_state_pickled: bytes | None,
inject_agent_state: bool,
is_async: bool,
args_schema_code: str | None,
environment_vars: Dict[str, Any],
) -> Dict[str, Any]:
"""Execute tool in Modal container."""
# Execute the modal_executor code in a clean namespace
# Create a module-like namespace for executor
executor_namespace = {
"__name__": "modal_executor",
"__file__": "/modal_executor.py",
}
# Read and execute the module file
with open("/modal_executor.py", "r") as f:
exec(compile(f.read(), "/modal_executor.py", "exec"), executor_namespace)
# Call the wrapper function from the executed namespace
return executor_namespace["execute_tool_wrapper"](
tool_source=tool_source,
tool_name=tool_name,
args_pickled=args_pickled,
agent_state_pickled=agent_state_pickled,
inject_agent_state=inject_agent_state,
is_async=is_async,
args_schema_code=args_schema_code,
environment_vars=environment_vars,
)
# Store the function reference
app.tool_executor = tool_executor
# Deploy the app
logger.info(f"Deploying Modal app {app_full_name}")
log_event("modal_v2_deploy_started", {"app_name": app_full_name, "version": version})
try:
# Try to look up the app first to see if it already exists
try:
await modal.App.lookup.aio(app_full_name)
logger.info(f"Modal app {app_full_name} already exists, skipping deployment")
log_event("modal_v2_deploy_already_exists", {"app_name": app_full_name, "version": version})
# Return the created app with the function attached
return app
except Exception:
# App doesn't exist, need to deploy
pass
with modal.enable_output():
await app.deploy.aio()
log_event("modal_v2_deploy_succeeded", {"app_name": app_full_name, "version": version})
except Exception as e:
log_event("modal_v2_deploy_failed", {"app_name": app_full_name, "version": version, "error": str(e)})
raise
return app
@trace_method
async def run(
self,
agent_state: AgentState | None = None,
additional_env_vars: Dict | None = None,
) -> ToolExecutionResult:
"""Execute the tool in Modal sandbox with dynamic argument passing."""
if self.provided_sandbox_config:
sbx_config = self.provided_sandbox_config
else:
sbx_config = await self.sandbox_config_manager.get_or_create_default_sandbox_config_async(
sandbox_type=SandboxType.MODAL, actor=self.user
)
envs = await self._gather_env_vars(agent_state, additional_env_vars or {}, sbx_config.id, is_local=False)
# Prepare schema code if needed
args_schema_code = None
if self.tool.args_json_schema:
from letta.services.helpers.tool_execution_helper import add_imports_and_pydantic_schemas_for_args
args_schema_code = add_imports_and_pydantic_schemas_for_args(self.tool.args_json_schema)
# Serialize arguments and agent state with safety checks
try:
args_pickled = safe_pickle_dumps(self.args)
except SafePickleError as e:
logger.warning(f"Failed to pickle args, attempting sanitization: {e}")
sanitized_args = sanitize_for_pickle(self.args)
try:
args_pickled = safe_pickle_dumps(sanitized_args)
except SafePickleError:
# Final fallback: convert to string representation
args_pickled = safe_pickle_dumps(str(self.args))
agent_state_pickled = None
if self.inject_agent_state and agent_state:
try:
agent_state_pickled = safe_pickle_dumps(agent_state)
except SafePickleError as e:
logger.warning(f"Failed to pickle agent state: {e}")
# For agent state, we prefer to skip injection rather than send corrupted data
agent_state_pickled = None
self.inject_agent_state = False
try:
log_event(
"modal_execution_started",
{
"tool": self.tool_name,
"app_name": self._deployment_manager._app_name,
"version": self._version_hash,
"env_vars": list(envs),
"args_size": len(args_pickled),
"agent_state_size": len(agent_state_pickled) if agent_state_pickled else 0,
"inject_agent_state": self.inject_agent_state,
},
)
# Get or deploy the Modal app
app = await self._get_or_deploy_modal_app(sbx_config)
# Get modal config for timeout settings
modal_config = sbx_config.get_modal_config()
# Execute the tool remotely with retry logic
max_retries = 3
retry_delay = 1 # seconds
last_error = None
for attempt in range(max_retries):
try:
# Add timeout to prevent hanging
import asyncio
result = await asyncio.wait_for(
app.tool_executor.remote.aio(
tool_source=self.tool.source_code,
tool_name=self.tool.name,
args_pickled=args_pickled,
agent_state_pickled=agent_state_pickled,
inject_agent_state=self.inject_agent_state,
is_async=self.is_async_function,
args_schema_code=args_schema_code,
environment_vars=envs,
),
timeout=modal_config.timeout + 10, # Add 10s buffer to Modal's own timeout
)
break # Success, exit retry loop
except asyncio.TimeoutError as e:
last_error = e
logger.warning(f"Modal execution timeout on attempt {attempt + 1}/{max_retries} for tool {self.tool_name}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
except Exception as e:
last_error = e
# Check if it's a transient error worth retrying
error_str = str(e).lower()
if any(x in error_str for x in ["segmentation fault", "sigsegv", "connection", "timeout"]):
logger.warning(f"Transient error on attempt {attempt + 1}/{max_retries} for tool {self.tool_name}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
retry_delay *= 2
continue
# Non-transient error, don't retry
raise
else:
# All retries exhausted
raise last_error
# Process the result
if result["error"]:
logger.debug(f"Tool {self.tool_name} raised a {result['error']['name']}: {result['error']['value']}")
logger.debug(f"Traceback from Modal sandbox: \n{result['error']['traceback']}")
# Check for segfault indicators
is_segfault = False
if "SIGSEGV" in str(result["error"]["value"]) or "Segmentation fault" in str(result["error"]["value"]):
is_segfault = True
logger.error(f"SEGFAULT detected in tool {self.tool_name}: {result['error']['value']}")
func_return = get_friendly_error_msg(
function_name=self.tool_name,
exception_name=result["error"]["name"],
exception_message=result["error"]["value"],
)
log_event(
"modal_execution_failed",
{
"tool": self.tool_name,
"app_name": self._deployment_manager._app_name,
"version": self._version_hash,
"error_type": result["error"]["name"],
"error_message": result["error"]["value"],
"func_return": func_return,
"is_segfault": is_segfault,
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", ""),
},
)
status = "error"
else:
func_return = result["result"]
agent_state = result["agent_state"]
log_event(
"modal_v2_execution_succeeded",
{
"tool": self.tool_name,
"app_name": self._deployment_manager._app_name,
"version": self._version_hash,
"func_return": str(func_return)[:500], # Limit logged result size
"stdout_size": len(result.get("stdout", "")),
"stderr_size": len(result.get("stderr", "")),
},
)
status = "success"
return ToolExecutionResult(
func_return=func_return,
agent_state=agent_state if not result["error"] else None,
stdout=[result["stdout"]] if result["stdout"] else [],
stderr=[result["stderr"]] if result["stderr"] else [],
status=status,
sandbox_config_fingerprint=sbx_config.fingerprint(),
)
except Exception as e:
import traceback
error_context = {
"tool": self.tool_name,
"app_name": self._deployment_manager._app_name,
"version": self._version_hash,
"error_type": type(e).__name__,
"error_message": str(e),
"traceback": traceback.format_exc(),
}
logger.error(f"Modal V2 execution for tool {self.tool_name} encountered an error: {e}", extra=error_context)
# Determine if this is a deployment error or execution error
if "deploy" in str(e).lower() or "modal" in str(e).lower():
error_category = "deployment_error"
else:
error_category = "execution_error"
func_return = get_friendly_error_msg(
function_name=self.tool_name,
exception_name=type(e).__name__,
exception_message=str(e),
)
log_event(f"modal_v2_{error_category}", error_context)
return ToolExecutionResult(
func_return=func_return,
agent_state=None,
stdout=[],
stderr=[f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"],
status="error",
sandbox_config_fingerprint=sbx_config.fingerprint(),
)
def _get_modal_image(self, sbx_config: SandboxConfig) -> modal.Image:
"""Get Modal image with required public python dependencies.
Caching and rebuilding is handled in a cascading manner
https://modal.com/docs/guide/images#image-caching-and-rebuilds
"""
# Start with a more robust base image with development tools
image = modal.Image.debian_slim(python_version=DEFAULT_PYTHON_VERSION)
# Add system packages for better C extension support
image = image.apt_install(
"build-essential", # Compilation tools
"libsqlite3-dev", # SQLite development headers
"libffi-dev", # Foreign Function Interface library
"libssl-dev", # OpenSSL development headers
"python3-dev", # Python development headers
)
# Include dependencies required by letta's ORM modules
# These are needed when unpickling agent_state objects
all_requirements = [
"letta",
"sqlite-vec>=0.1.7a2", # Required for SQLite vector operations
"numpy<2.0", # Pin numpy to avoid compatibility issues
]
# Add sandbox-specific pip requirements
modal_configs = sbx_config.get_modal_config()
if modal_configs.pip_requirements:
all_requirements.extend([str(req) for req in modal_configs.pip_requirements])
# Add tool-specific pip requirements
if self.tool and self.tool.pip_requirements:
all_requirements.extend([str(req) for req in self.tool.pip_requirements])
if all_requirements:
image = image.pip_install(*all_requirements)
return image