feat: Support natively async e2b (#1512)

This commit is contained in:
Matthew Zhou
2025-04-01 14:28:24 -07:00
committed by GitHub
parent e1c04c8605
commit 25b106d1cd
8 changed files with 1051 additions and 420 deletions

View File

@@ -1,397 +0,0 @@
import ast
import asyncio
import base64
import os
import pickle
import sys
import tempfile
import uuid
from typing import Any, Dict, Optional, Tuple
from letta.functions.helpers import generate_model_from_args_json_schema
from letta.schemas.agent import AgentState
from letta.schemas.sandbox_config import SandboxRunResult, SandboxType
from letta.services.helpers.tool_execution_helper import (
add_imports_and_pydantic_schemas_for_args,
create_venv_for_local_sandbox,
find_python_executable,
install_pip_requirements_for_sandbox,
)
from letta.services.organization_manager import OrganizationManager
from letta.services.sandbox_config_manager import SandboxConfigManager
from letta.services.tool_manager import ToolManager
from letta.tracing import log_event, trace_method
from letta.utils import get_friendly_error_msg
class AsyncToolExecutionSandbox:
METADATA_CONFIG_STATE_KEY = "config_state"
REQUIREMENT_TXT_NAME = "requirements.txt"
# For generating long, random marker hashes
NAMESPACE = uuid.NAMESPACE_DNS
LOCAL_SANDBOX_RESULT_START_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-start-marker"))
LOCAL_SANDBOX_RESULT_END_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-end-marker"))
# This is the variable name in the auto-generated code that contains the function results
# We make this a long random string to avoid collisions with any variables in the user's code
LOCAL_SANDBOX_RESULT_VAR_NAME = "result_ZQqiequkcFwRwwGQMqkt"
def __init__(self, tool_name: str, args: dict, user, force_recreate=True, force_recreate_venv=False, tool_object=None):
self.tool_name = tool_name
self.args = args
self.user = user
# get organization
self.organization = OrganizationManager().get_organization_by_id(self.user.organization_id)
self.privileged_tools = self.organization.privileged_tools
# If a tool object is provided, we use it directly, otherwise pull via name
if tool_object is not None:
self.tool = tool_object
else:
# Get the tool via name
self.tool = ToolManager().get_tool_by_name(tool_name=tool_name, actor=self.user)
if not self.tool:
raise ValueError(
f"Agent attempted to invoke tool {self.tool_name} that does not exist for organization {self.user.organization_id}"
)
self.sandbox_config_manager = SandboxConfigManager()
self.force_recreate = force_recreate
self.force_recreate_venv = force_recreate_venv
async def run(
self, agent_state: Optional[AgentState] = None, additional_env_vars: Optional[Dict] = None, inject_agent_state: bool = False
) -> SandboxRunResult:
"""
Run the tool in a sandbox environment asynchronously,
*always* using a subprocess for execution.
"""
result = await self.run_local_dir_sandbox(
agent_state=agent_state, additional_env_vars=additional_env_vars, inject_agent_state=inject_agent_state
)
# Simple console logging for demonstration
for log_line in (result.stdout or []) + (result.stderr or []):
print(f"Tool execution log: {log_line}")
return result
@trace_method
async def run_local_dir_sandbox(
self, agent_state: Optional[AgentState], additional_env_vars: Optional[Dict], inject_agent_state: bool
) -> SandboxRunResult:
"""
Unified asynchronougit pus method to run the tool in a local sandbox environment,
always via subprocess for multi-core parallelism.
"""
# Get sandbox configuration
sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.LOCAL, actor=self.user)
local_configs = sbx_config.get_local_config()
use_venv = local_configs.use_venv
# Prepare environment variables
env = os.environ.copy()
env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100)
env.update(env_vars)
if agent_state:
env.update(agent_state.get_agent_env_vars_as_dict())
if additional_env_vars:
env.update(additional_env_vars)
# Make sure sandbox directory exists
sandbox_dir = os.path.expanduser(local_configs.sandbox_dir)
if not os.path.exists(sandbox_dir) or not os.path.isdir(sandbox_dir):
os.makedirs(sandbox_dir)
# If using a virtual environment, ensure it's prepared in parallel
venv_preparation_task = None
if use_venv:
venv_path = str(os.path.join(sandbox_dir, local_configs.venv_name))
if self.force_recreate_venv or not os.path.isdir(venv_path):
venv_preparation_task = asyncio.create_task(self._prepare_venv(local_configs, venv_path, env))
# Generate and write execution script (always with markers, since we rely on stdout)
with tempfile.NamedTemporaryFile(mode="w", dir=sandbox_dir, suffix=".py", delete=False) as temp_file:
code = self.generate_execution_script(agent_state=agent_state, inject_agent_state=inject_agent_state)
temp_file.write(code)
temp_file.flush()
temp_file_path = temp_file.name
try:
# If we started a venv preparation task, wait for it to complete
if venv_preparation_task:
await venv_preparation_task
# Determine the python executable and environment for the subprocess
exec_env = env.copy()
if use_venv:
venv_path = str(os.path.join(sandbox_dir, local_configs.venv_name))
python_executable = find_python_executable(local_configs)
exec_env["VIRTUAL_ENV"] = venv_path
exec_env["PATH"] = os.path.join(venv_path, "bin") + ":" + exec_env["PATH"]
else:
# If not using venv, use whatever Python we are running on
python_executable = sys.executable
exec_env["PYTHONWARNINGS"] = "ignore"
# Execute in subprocess
return await self._execute_tool_subprocess(
sbx_config=sbx_config,
python_executable=python_executable,
temp_file_path=temp_file_path,
env=exec_env,
cwd=sandbox_dir,
)
except Exception as e:
print(f"Executing tool {self.tool_name} has an unexpected error: {e}")
print(f"Auto-generated code for debugging:\n\n{code}")
raise e
finally:
# Clean up the temp file
os.remove(temp_file_path)
async def _prepare_venv(self, local_configs, venv_path: str, env: Dict[str, str]):
"""
Prepare virtual environment asynchronously (in a background thread).
"""
sandbox_dir = os.path.expanduser(local_configs.sandbox_dir)
log_event(name="start create_venv_for_local_sandbox", attributes={"venv_path": venv_path})
await asyncio.to_thread(
create_venv_for_local_sandbox,
sandbox_dir_path=sandbox_dir,
venv_path=venv_path,
env=env,
force_recreate=self.force_recreate_venv,
)
log_event(name="finish create_venv_for_local_sandbox")
log_event(name="start install_pip_requirements_for_sandbox", attributes={"local_configs": local_configs.model_dump_json()})
await asyncio.to_thread(install_pip_requirements_for_sandbox, local_configs, upgrade=True, user_install_if_no_venv=False, env=env)
log_event(name="finish install_pip_requirements_for_sandbox", attributes={"local_configs": local_configs.model_dump_json()})
@trace_method
async def _execute_tool_subprocess(
self, sbx_config, python_executable: str, temp_file_path: str, env: Dict[str, str], cwd: str
) -> SandboxRunResult:
"""
Execute user code in a subprocess, always capturing stdout and stderr.
We parse special markers to extract the pickled result string.
"""
try:
log_event(name="start subprocess")
process = await asyncio.create_subprocess_exec(
python_executable, temp_file_path, env=env, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(process.communicate(), timeout=60)
except asyncio.TimeoutError:
# Terminate the process on timeout
if process.returncode is None:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
process.kill()
raise TimeoutError(f"Executing tool {self.tool_name} timed out after 60 seconds.")
stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""
log_event(name="finish subprocess")
# Parse markers to isolate the function result
func_result, stdout_text = self.parse_out_function_results_markers(stdout)
func_return, agent_state = self.parse_best_effort(func_result)
return SandboxRunResult(
func_return=func_return,
agent_state=agent_state,
stdout=[stdout_text] if stdout_text else [],
stderr=[stderr] if stderr else [],
status="success" if process.returncode == 0 else "error",
sandbox_config_fingerprint=sbx_config.fingerprint(),
)
except (TimeoutError, Exception) as e:
# Distinguish between timeouts and other exceptions for clarity
if isinstance(e, TimeoutError):
raise e
print(f"Subprocess execution for tool {self.tool_name} encountered an error: {e}")
func_return = get_friendly_error_msg(
function_name=self.tool_name,
exception_name=type(e).__name__,
exception_message=str(e),
)
return SandboxRunResult(
func_return=func_return,
agent_state=None,
stdout=[],
stderr=[str(e)],
status="error",
sandbox_config_fingerprint=sbx_config.fingerprint(),
)
def parse_out_function_results_markers(self, text: str) -> Tuple[str, str]:
"""
Parse the function results out of the stdout using special markers.
Returns (function_result_str, stripped_stdout).
"""
if self.LOCAL_SANDBOX_RESULT_START_MARKER not in text:
# No markers found, so nothing to parse
return "", text
marker_len = len(self.LOCAL_SANDBOX_RESULT_START_MARKER)
start_index = text.index(self.LOCAL_SANDBOX_RESULT_START_MARKER) + marker_len
end_index = text.index(self.LOCAL_SANDBOX_RESULT_END_MARKER)
# The actual pickled base64 is between start_index and end_index
results_str = text[start_index:end_index]
# The rest of stdout (minus the markers)
remainder = text[: start_index - marker_len] + text[end_index + marker_len :]
return results_str, remainder
def parse_best_effort(self, text: str) -> Tuple[Any, Optional[AgentState]]:
"""
Decode and unpickle the result from the function execution if possible.
Returns (function_return_value, agent_state).
"""
if not text:
return None, None
result = pickle.loads(base64.b64decode(text))
agent_state = result["agent_state"] if result["agent_state"] is not None else None
return result["results"], agent_state
def parse_function_arguments(self, source_code: str, tool_name: str) -> list:
"""
Get arguments of the given function from its source code via AST.
"""
tree = ast.parse(source_code)
args = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == tool_name:
for arg in node.args.args:
args.append(arg.arg)
return args
def generate_execution_script(self, agent_state: Optional[AgentState], inject_agent_state: bool) -> str:
"""
Generate code to run inside of execution sandbox.
Serialize the agent state and arguments, call the tool,
then base64-encode/pickle the result.
"""
code = "from typing import *\n"
code += "import pickle\n"
code += "import sys\n"
code += "import base64\n"
# Additional imports to support agent state
if inject_agent_state:
code += "import letta\n"
code += "from letta import * \n"
# Add schema code if available
if self.tool.args_json_schema:
schema_code = add_imports_and_pydantic_schemas_for_args(self.tool.args_json_schema)
if "from __future__ import annotations" in schema_code:
schema_code = schema_code.replace("from __future__ import annotations", "").lstrip()
code = "from __future__ import annotations\n\n" + code
code += schema_code + "\n"
# Load the agent state
if inject_agent_state:
agent_state_pickle = pickle.dumps(agent_state)
code += f"agent_state = pickle.loads({agent_state_pickle})\n"
else:
code += "agent_state = None\n"
# Initialize arguments
if self.tool.args_json_schema:
args_schema = generate_model_from_args_json_schema(self.tool.args_json_schema)
code += f"args_object = {args_schema.__name__}(**{self.args})\n"
for param in self.args:
code += f"{param} = args_object.{param}\n"
else:
for param in self.args:
code += self.initialize_param(param, self.args[param])
# Insert the tool's source code
code += "\n" + self.tool.source_code + "\n"
# Invoke the function and store the result in a global variable
code += (
f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME}"
+ ' = {"results": '
+ self.invoke_function_call(inject_agent_state=inject_agent_state)
+ ', "agent_state": agent_state}\n'
)
code += (
f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME} = base64.b64encode("
f"pickle.dumps({self.LOCAL_SANDBOX_RESULT_VAR_NAME})"
").decode('utf-8')\n"
)
# If we're always in a subprocess, we must rely on markers to parse out the result
code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_START_MARKER}')\n"
code += f"sys.stdout.write(str({self.LOCAL_SANDBOX_RESULT_VAR_NAME}))\n"
code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_END_MARKER}')\n"
return code
def _convert_param_to_value(self, param_type: str, raw_value: str) -> str:
"""
Convert parameter to Python code representation based on JSON schema type.
"""
if param_type == "string":
# Safely inject a Python string via pickle
value = "pickle.loads(" + str(pickle.dumps(raw_value)) + ")"
elif param_type in ["integer", "boolean", "number", "array", "object"]:
# This is simplistic. In real usage, ensure correct type-casting or sanitization.
value = raw_value
else:
raise TypeError(f"Unsupported type: {param_type}, raw_value={raw_value}")
return str(value)
def initialize_param(self, name: str, raw_value: str) -> str:
"""
Produce code for initializing a single parameter in the generated script.
"""
params = self.tool.json_schema["parameters"]["properties"]
spec = params.get(name)
if spec is None:
# Possibly an extra param like 'self' that we ignore
return ""
param_type = spec.get("type")
if param_type is None and spec.get("parameters"):
param_type = spec["parameters"].get("type")
value = self._convert_param_to_value(param_type, raw_value)
return f"{name} = {value}\n"
def invoke_function_call(self, inject_agent_state: bool) -> str:
"""
Generate the function call code string with the appropriate arguments.
"""
kwargs = []
for name in self.args:
if name in self.tool.json_schema["parameters"]["properties"]:
kwargs.append(name)
param_list = [f"{arg}={arg}" for arg in kwargs]
if inject_agent_state:
param_list.append("agent_state=agent_state")
params = ", ".join(param_list)
func_call_str = self.tool.name + "(" + params + ")"
return func_call_str

View File

@@ -387,9 +387,7 @@ class ToolExecutionSandbox:
sbx = Sandbox(sandbox_config.get_e2b_config().template, metadata={self.METADATA_CONFIG_STATE_KEY: state_hash})
else:
# no template
sbx = Sandbox(
metadata={self.METADATA_CONFIG_STATE_KEY: state_hash}, **e2b_config.model_dump(to_orm=True, exclude={"pip_requirements"})
)
sbx = Sandbox(metadata={self.METADATA_CONFIG_STATE_KEY: state_hash}, **e2b_config.model_dump(exclude={"pip_requirements"}))
# install pip requirements
if e2b_config.pip_requirements:

View File

@@ -1,4 +1,3 @@
import ast
import math
from abc import ABC, abstractmethod
from typing import Any, Optional, Tuple
@@ -15,7 +14,9 @@ from letta.schemas.user import User
from letta.services.agent_manager import AgentManager
from letta.services.message_manager import MessageManager
from letta.services.passage_manager import PassageManager
from letta.services.tool_executor.async_tool_execution_sandbox import AsyncToolExecutionSandbox
from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B
from letta.services.tool_sandbox.local_sandbox import AsyncToolSandboxLocal
from letta.settings import tool_settings
from letta.utils import get_friendly_error_msg
@@ -334,16 +335,13 @@ class SandboxToolExecutor(ToolExecutor):
agent_state_copy = self._create_agent_state_copy(agent_state)
# TODO: This is brittle, think about better way to do this?
if "agent_state" in self.parse_function_arguments(tool.source_code, tool.name):
inject_agent_state = True
# Execute in sandbox depending on API key
if tool_settings.e2b_api_key:
sandbox = AsyncToolSandboxE2B(function_name, function_args, actor, tool_object=tool)
else:
inject_agent_state = False
sandbox = AsyncToolSandboxLocal(function_name, function_args, actor, tool_object=tool)
# Execute in sandbox
sandbox_run_result = await AsyncToolExecutionSandbox(function_name, function_args, actor, tool_object=tool).run(
agent_state=agent_state_copy, inject_agent_state=inject_agent_state
)
sandbox_run_result = await sandbox.run(agent_state=agent_state_copy)
function_response, updated_agent_state = sandbox_run_result.func_return, sandbox_run_result.agent_state
@@ -371,16 +369,6 @@ class SandboxToolExecutor(ToolExecutor):
# This is defensive programming - we try to coerce but fall back if it fails
return function_args
def parse_function_arguments(self, source_code: str, tool_name: str):
"""Get arguments of a function from its source code"""
tree = ast.parse(source_code)
args = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == tool_name:
for arg in node.args.args:
args.append(arg.arg)
return args
def _create_agent_state_copy(self, agent_state: AgentState):
"""Create a copy of agent state for sandbox execution."""
agent_state_copy = agent_state.__deepcopy__()

View File

View File

@@ -0,0 +1,190 @@
import ast
import base64
import pickle
import uuid
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Tuple
from letta.functions.helpers import generate_model_from_args_json_schema
from letta.schemas.agent import AgentState
from letta.schemas.sandbox_config import SandboxRunResult
from letta.services.helpers.tool_execution_helper import add_imports_and_pydantic_schemas_for_args
from letta.services.organization_manager import OrganizationManager
from letta.services.sandbox_config_manager import SandboxConfigManager
from letta.services.tool_manager import ToolManager
class AsyncToolSandboxBase(ABC):
NAMESPACE = uuid.NAMESPACE_DNS
LOCAL_SANDBOX_RESULT_START_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-start-marker"))
LOCAL_SANDBOX_RESULT_END_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-end-marker"))
LOCAL_SANDBOX_RESULT_VAR_NAME = "result_ZQqiequkcFwRwwGQMqkt"
def __init__(self, tool_name: str, args: dict, user, tool_object=None):
self.tool_name = tool_name
self.args = args
self.user = user
self.organization = OrganizationManager().get_organization_by_id(self.user.organization_id)
self.privileged_tools = self.organization.privileged_tools
self.tool = tool_object or ToolManager().get_tool_by_name(tool_name=tool_name, actor=self.user)
if self.tool is None:
raise ValueError(
f"Agent attempted to invoke tool {self.tool_name} that does not exist for organization {self.user.organization_id}"
)
self.sandbox_config_manager = SandboxConfigManager()
# See if we should inject agent_state or not based on the presence of the "agent_state" arg
if "agent_state" in self.parse_function_arguments(self.tool.source_code, self.tool.name):
self.inject_agent_state = True
else:
self.inject_agent_state = False
@abstractmethod
async def run(
self,
agent_state: Optional[AgentState] = None,
additional_env_vars: Optional[Dict] = None,
) -> SandboxRunResult:
"""
Run the tool in a sandbox environment asynchronously.
Must be implemented by subclasses.
"""
raise NotImplementedError
def generate_execution_script(self, agent_state: Optional[AgentState], wrap_print_with_markers: bool = False) -> str:
"""
Generate code to run inside of execution sandbox.
Serialize the agent state and arguments, call the tool,
then base64-encode/pickle the result.
"""
code = "from typing import *\n"
code += "import pickle\n"
code += "import sys\n"
code += "import base64\n"
# Additional imports to support agent state
if self.inject_agent_state:
code += "import letta\n"
code += "from letta import * \n"
# Add schema code if available
if self.tool.args_json_schema:
schema_code = add_imports_and_pydantic_schemas_for_args(self.tool.args_json_schema)
if "from __future__ import annotations" in schema_code:
schema_code = schema_code.replace("from __future__ import annotations", "").lstrip()
code = "from __future__ import annotations\n\n" + code
code += schema_code + "\n"
# Load the agent state
if self.inject_agent_state:
agent_state_pickle = pickle.dumps(agent_state)
code += f"agent_state = pickle.loads({agent_state_pickle})\n"
else:
code += "agent_state = None\n"
# Initialize arguments
if self.tool.args_json_schema:
args_schema = generate_model_from_args_json_schema(self.tool.args_json_schema)
code += f"args_object = {args_schema.__name__}(**{self.args})\n"
for param in self.args:
code += f"{param} = args_object.{param}\n"
else:
for param in self.args:
code += self.initialize_param(param, self.args[param])
# Insert the tool's source code
code += "\n" + self.tool.source_code + "\n"
# Invoke the function and store the result in a global variable
code += (
f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME}" + ' = {"results": ' + self.invoke_function_call() + ', "agent_state": agent_state}\n'
)
code += (
f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME} = base64.b64encode("
f"pickle.dumps({self.LOCAL_SANDBOX_RESULT_VAR_NAME})"
").decode('utf-8')\n"
)
if wrap_print_with_markers:
code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_START_MARKER}')\n"
code += f"sys.stdout.write(str({self.LOCAL_SANDBOX_RESULT_VAR_NAME}))\n"
code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_END_MARKER}')\n"
else:
code += f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME}\n"
return code
def _convert_param_to_value(self, param_type: str, raw_value: str) -> str:
"""
Convert parameter to Python code representation based on JSON schema type.
"""
if param_type == "string":
# Safely inject a Python string via pickle
value = "pickle.loads(" + str(pickle.dumps(raw_value)) + ")"
elif param_type in ["integer", "boolean", "number", "array", "object"]:
# This is simplistic. In real usage, ensure correct type-casting or sanitization.
value = raw_value
else:
raise TypeError(f"Unsupported type: {param_type}, raw_value={raw_value}")
return str(value)
def initialize_param(self, name: str, raw_value: str) -> str:
"""
Produce code for initializing a single parameter in the generated script.
"""
params = self.tool.json_schema["parameters"]["properties"]
spec = params.get(name)
if spec is None:
# Possibly an extra param like 'self' that we ignore
return ""
param_type = spec.get("type")
if param_type is None and spec.get("parameters"):
param_type = spec["parameters"].get("type")
value = self._convert_param_to_value(param_type, raw_value)
return f"{name} = {value}\n"
def invoke_function_call(self) -> str:
"""
Generate the function call code string with the appropriate arguments.
"""
kwargs = []
for name in self.args:
if name in self.tool.json_schema["parameters"]["properties"]:
kwargs.append(name)
param_list = [f"{arg}={arg}" for arg in kwargs]
if self.inject_agent_state:
param_list.append("agent_state=agent_state")
params = ", ".join(param_list)
func_call_str = self.tool.name + "(" + params + ")"
return func_call_str
def parse_best_effort(self, text: str) -> Tuple[Any, Optional[AgentState]]:
"""
Decode and unpickle the result from the function execution if possible.
Returns (function_return_value, agent_state).
"""
if not text:
return None, None
result = pickle.loads(base64.b64decode(text))
print("LOOK HERE!")
print(result)
agent_state = result["agent_state"]
return result["results"], agent_state
def parse_function_arguments(self, source_code: str, tool_name: str):
"""Get arguments of a function from its source code"""
tree = ast.parse(source_code)
args = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == tool_name:
for arg in node.args.args:
args.append(arg.arg)
return args

View File

@@ -0,0 +1,116 @@
from typing import Dict, Optional
from letta.log import get_logger
from letta.schemas.agent import AgentState
from letta.schemas.sandbox_config import SandboxConfig, SandboxRunResult, SandboxType
from letta.services.tool_sandbox.base import AsyncToolSandboxBase
from letta.utils import get_friendly_error_msg
logger = get_logger(__name__)
class AsyncToolSandboxE2B(AsyncToolSandboxBase):
METADATA_CONFIG_STATE_KEY = "config_state"
def __init__(self, tool_name: str, args: dict, user, force_recreate=True, tool_object=None):
super().__init__(tool_name, args, user, tool_object)
self.force_recreate = force_recreate
async def run(
self,
agent_state: Optional[AgentState] = None,
additional_env_vars: Optional[Dict] = None,
) -> SandboxRunResult:
"""
Run the tool in a sandbox environment asynchronously,
*always* using a subprocess for execution.
"""
result = await self.run_e2b_sandbox(agent_state=agent_state, additional_env_vars=additional_env_vars)
# Simple console logging for demonstration
for log_line in (result.stdout or []) + (result.stderr or []):
print(f"Tool execution log: {log_line}")
return result
async def run_e2b_sandbox(
self, agent_state: Optional[AgentState] = None, additional_env_vars: Optional[Dict] = None
) -> SandboxRunResult:
sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.E2B, actor=self.user)
# TODO: So this defaults to force recreating always
# TODO: Eventually, provision one sandbox PER agent, and that agent re-uses that one specifically
e2b_sandbox = await self.create_e2b_sandbox_with_metadata_hash(sandbox_config=sbx_config)
logger.info(f"E2B Sandbox configurations: {sbx_config}")
logger.info(f"E2B Sandbox ID: {e2b_sandbox.sandbox_id}")
# TODO: This only makes sense if we re-use sandboxes
# # Since this sandbox was used, we extend its lifecycle by the timeout
# await sbx.set_timeout(sbx_config.get_e2b_config().timeout)
# Get environment variables for the sandbox
# TODO: We set limit to 100 here, but maybe we want it uncapped? Realistically this should be fine.
env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100)
# Get environment variables for this agent specifically
if agent_state:
env_vars.update(agent_state.get_agent_env_vars_as_dict())
# Finally, get any that are passed explicitly into the `run` function call
if additional_env_vars:
env_vars.update(additional_env_vars)
code = self.generate_execution_script(agent_state=agent_state)
execution = await e2b_sandbox.run_code(code, envs=env_vars)
if execution.results:
func_return, agent_state = self.parse_best_effort(execution.results[0].text)
elif execution.error:
logger.error(f"Executing tool {self.tool_name} raised a {execution.error.name} with message: \n{execution.error.value}")
logger.error(f"Traceback from e2b sandbox: \n{execution.error.traceback}")
func_return = get_friendly_error_msg(
function_name=self.tool_name, exception_name=execution.error.name, exception_message=execution.error.value
)
execution.logs.stderr.append(execution.error.traceback)
else:
raise ValueError(f"Tool {self.tool_name} returned execution with None")
return SandboxRunResult(
func_return=func_return,
agent_state=agent_state,
stdout=execution.logs.stdout,
stderr=execution.logs.stderr,
status="error" if execution.error else "success",
sandbox_config_fingerprint=sbx_config.fingerprint(),
)
def parse_exception_from_e2b_execution(self, e2b_execution: "Execution") -> Exception:
builtins_dict = __builtins__ if isinstance(__builtins__, dict) else vars(__builtins__)
# Dynamically fetch the exception class from builtins, defaulting to Exception if not found
exception_class = builtins_dict.get(e2b_execution.error.name, Exception)
return exception_class(e2b_execution.error.value)
async def create_e2b_sandbox_with_metadata_hash(self, sandbox_config: SandboxConfig) -> "Sandbox":
from e2b_code_interpreter import AsyncSandbox
state_hash = sandbox_config.fingerprint()
e2b_config = sandbox_config.get_e2b_config()
if e2b_config.template:
sbx = await AsyncSandbox.create(sandbox_config.get_e2b_config().template, metadata={self.METADATA_CONFIG_STATE_KEY: state_hash})
else:
# no template
sbx = await AsyncSandbox.create(
metadata={self.METADATA_CONFIG_STATE_KEY: state_hash}, **e2b_config.model_dump(exclude={"pip_requirements"})
)
# install pip requirements
if e2b_config.pip_requirements:
for package in e2b_config.pip_requirements:
await sbx.commands.run(f"pip install {package}")
return sbx
async def list_running_e2b_sandboxes(self):
from e2b_code_interpreter import AsyncSandbox
# List running sandboxes and access metadata.
return await AsyncSandbox.list()

View File

@@ -0,0 +1,221 @@
import asyncio
import os
import sys
import tempfile
from typing import Dict, Optional, Tuple
from letta.schemas.agent import AgentState
from letta.schemas.sandbox_config import SandboxRunResult, SandboxType
from letta.services.helpers.tool_execution_helper import (
create_venv_for_local_sandbox,
find_python_executable,
install_pip_requirements_for_sandbox,
)
from letta.services.tool_sandbox.base import AsyncToolSandboxBase
from letta.tracing import log_event, trace_method
from letta.utils import get_friendly_error_msg
class AsyncToolSandboxLocal(AsyncToolSandboxBase):
METADATA_CONFIG_STATE_KEY = "config_state"
REQUIREMENT_TXT_NAME = "requirements.txt"
def __init__(self, tool_name: str, args: dict, user, force_recreate_venv=False, tool_object=None):
super().__init__(tool_name, args, user, tool_object)
self.force_recreate_venv = force_recreate_venv
async def run(
self,
agent_state: Optional[AgentState] = None,
additional_env_vars: Optional[Dict] = None,
) -> SandboxRunResult:
"""
Run the tool in a sandbox environment asynchronously,
*always* using a subprocess for execution.
"""
result = await self.run_local_dir_sandbox(agent_state=agent_state, additional_env_vars=additional_env_vars)
# Simple console logging for demonstration
for log_line in (result.stdout or []) + (result.stderr or []):
print(f"Tool execution log: {log_line}")
return result
@trace_method
async def run_local_dir_sandbox(self, agent_state: Optional[AgentState], additional_env_vars: Optional[Dict]) -> SandboxRunResult:
"""
Unified asynchronougit pus method to run the tool in a local sandbox environment,
always via subprocess for multi-core parallelism.
"""
# Get sandbox configuration
sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.LOCAL, actor=self.user)
local_configs = sbx_config.get_local_config()
use_venv = local_configs.use_venv
# Prepare environment variables
env = os.environ.copy()
env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100)
env.update(env_vars)
if agent_state:
env.update(agent_state.get_agent_env_vars_as_dict())
if additional_env_vars:
env.update(additional_env_vars)
# Make sure sandbox directory exists
sandbox_dir = os.path.expanduser(local_configs.sandbox_dir)
if not os.path.exists(sandbox_dir) or not os.path.isdir(sandbox_dir):
os.makedirs(sandbox_dir)
# If using a virtual environment, ensure it's prepared in parallel
venv_preparation_task = None
if use_venv:
venv_path = str(os.path.join(sandbox_dir, local_configs.venv_name))
venv_preparation_task = asyncio.create_task(self._prepare_venv(local_configs, venv_path, env))
# Generate and write execution script (always with markers, since we rely on stdout)
with tempfile.NamedTemporaryFile(mode="w", dir=sandbox_dir, suffix=".py", delete=False) as temp_file:
code = self.generate_execution_script(agent_state=agent_state, wrap_print_with_markers=True)
temp_file.write(code)
temp_file.flush()
temp_file_path = temp_file.name
try:
# If we started a venv preparation task, wait for it to complete
if venv_preparation_task:
await venv_preparation_task
# Determine the python executable and environment for the subprocess
exec_env = env.copy()
if use_venv:
venv_path = str(os.path.join(sandbox_dir, local_configs.venv_name))
python_executable = find_python_executable(local_configs)
exec_env["VIRTUAL_ENV"] = venv_path
exec_env["PATH"] = os.path.join(venv_path, "bin") + ":" + exec_env["PATH"]
else:
# If not using venv, use whatever Python we are running on
python_executable = sys.executable
exec_env["PYTHONWARNINGS"] = "ignore"
# Execute in subprocess
return await self._execute_tool_subprocess(
sbx_config=sbx_config,
python_executable=python_executable,
temp_file_path=temp_file_path,
env=exec_env,
cwd=sandbox_dir,
)
except Exception as e:
print(f"Executing tool {self.tool_name} has an unexpected error: {e}")
print(f"Auto-generated code for debugging:\n\n{code}")
raise e
finally:
# Clean up the temp file
os.remove(temp_file_path)
async def _prepare_venv(self, local_configs, venv_path: str, env: Dict[str, str]):
"""
Prepare virtual environment asynchronously (in a background thread).
"""
if self.force_recreate_venv or not os.path.isdir(venv_path):
sandbox_dir = os.path.expanduser(local_configs.sandbox_dir)
log_event(name="start create_venv_for_local_sandbox", attributes={"venv_path": venv_path})
await asyncio.to_thread(
create_venv_for_local_sandbox,
sandbox_dir_path=sandbox_dir,
venv_path=venv_path,
env=env,
force_recreate=self.force_recreate_venv,
)
log_event(name="finish create_venv_for_local_sandbox")
log_event(name="start install_pip_requirements_for_sandbox", attributes={"local_configs": local_configs.model_dump_json()})
await asyncio.to_thread(install_pip_requirements_for_sandbox, local_configs, upgrade=True, user_install_if_no_venv=False, env=env)
log_event(name="finish install_pip_requirements_for_sandbox", attributes={"local_configs": local_configs.model_dump_json()})
@trace_method
async def _execute_tool_subprocess(
self, sbx_config, python_executable: str, temp_file_path: str, env: Dict[str, str], cwd: str
) -> SandboxRunResult:
"""
Execute user code in a subprocess, always capturing stdout and stderr.
We parse special markers to extract the pickled result string.
"""
try:
log_event(name="start subprocess")
process = await asyncio.create_subprocess_exec(
python_executable, temp_file_path, env=env, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(process.communicate(), timeout=60)
except asyncio.TimeoutError:
# Terminate the process on timeout
if process.returncode is None:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
process.kill()
raise TimeoutError(f"Executing tool {self.tool_name} timed out after 60 seconds.")
stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""
log_event(name="finish subprocess")
# Parse markers to isolate the function result
func_result, stdout_text = self.parse_out_function_results_markers(stdout)
func_return, agent_state = self.parse_best_effort(func_result)
return SandboxRunResult(
func_return=func_return,
agent_state=agent_state,
stdout=[stdout_text] if stdout_text else [],
stderr=[stderr] if stderr else [],
status="success" if process.returncode == 0 else "error",
sandbox_config_fingerprint=sbx_config.fingerprint(),
)
except (TimeoutError, Exception) as e:
# Distinguish between timeouts and other exceptions for clarity
if isinstance(e, TimeoutError):
raise e
print(f"Subprocess execution for tool {self.tool_name} encountered an error: {e}")
func_return = get_friendly_error_msg(
function_name=self.tool_name,
exception_name=type(e).__name__,
exception_message=str(e),
)
return SandboxRunResult(
func_return=func_return,
agent_state=None,
stdout=[],
stderr=[str(e)],
status="error",
sandbox_config_fingerprint=sbx_config.fingerprint(),
)
def parse_out_function_results_markers(self, text: str) -> Tuple[str, str]:
"""
Parse the function results out of the stdout using special markers.
Returns (function_result_str, stripped_stdout).
"""
if self.LOCAL_SANDBOX_RESULT_START_MARKER not in text:
# No markers found, so nothing to parse
return "", text
marker_len = len(self.LOCAL_SANDBOX_RESULT_START_MARKER)
start_index = text.index(self.LOCAL_SANDBOX_RESULT_START_MARKER) + marker_len
end_index = text.index(self.LOCAL_SANDBOX_RESULT_END_MARKER)
# The actual pickled base64 is between start_index and end_index
results_str = text[start_index:end_index]
# The rest of stdout (minus the markers)
remainder = text[: start_index - marker_len] + text[end_index + marker_len :]
return results_str, remainder

View File

@@ -0,0 +1,515 @@
import secrets
import string
import uuid
from pathlib import Path
from unittest.mock import patch
import pytest
from sqlalchemy import delete
from letta import create_client
from letta.functions.function_sets.base import core_memory_append, core_memory_replace
from letta.orm.sandbox_config import SandboxConfig, SandboxEnvironmentVariable
from letta.schemas.agent import AgentState
from letta.schemas.embedding_config import EmbeddingConfig
from letta.schemas.environment_variables import AgentEnvironmentVariable, SandboxEnvironmentVariableCreate
from letta.schemas.llm_config import LLMConfig
from letta.schemas.memory import ChatMemory
from letta.schemas.organization import Organization
from letta.schemas.sandbox_config import E2BSandboxConfig, LocalSandboxConfig, PipRequirement, SandboxConfigCreate
from letta.schemas.user import User
from letta.services.organization_manager import OrganizationManager
from letta.services.sandbox_config_manager import SandboxConfigManager
from letta.services.tool_manager import ToolManager
from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B
from letta.services.tool_sandbox.local_sandbox import AsyncToolSandboxLocal
from letta.services.user_manager import UserManager
from tests.helpers.utils import create_tool_from_func
# Constants
namespace = uuid.NAMESPACE_DNS
org_name = str(uuid.uuid5(namespace, "test-tool-execution-sandbox-org"))
user_name = str(uuid.uuid5(namespace, "test-tool-execution-sandbox-user"))
# Fixtures
@pytest.fixture(autouse=True)
def clear_tables():
"""Fixture to clear the organization table before each test."""
from letta.server.db import db_context
with db_context() as session:
session.execute(delete(SandboxEnvironmentVariable))
session.execute(delete(SandboxConfig))
session.commit() # Commit the deletion
@pytest.fixture
def test_organization():
"""Fixture to create and return the default organization."""
org = OrganizationManager().create_organization(Organization(name=org_name))
yield org
@pytest.fixture
def test_user(test_organization):
"""Fixture to create and return the default user within the default organization."""
user = UserManager().create_user(User(name=user_name, organization_id=test_organization.id))
yield user
@pytest.fixture
def add_integers_tool(test_user):
def add(x: int, y: int) -> int:
"""
Simple function that adds two integers.
Parameters:
x (int): The first integer to add.
y (int): The second integer to add.
Returns:
int: The result of adding x and y.
"""
return x + y
tool = create_tool_from_func(add)
tool = ToolManager().create_or_update_tool(tool, test_user)
yield tool
@pytest.fixture
def cowsay_tool(test_user):
# This defines a tool for a package we definitely do NOT have in letta
# If this test passes, that means the tool was correctly executed in a separate Python environment
def cowsay() -> str:
"""
Simple function that uses the cowsay package to print out the secret word env variable.
Returns:
str: The cowsay ASCII art.
"""
import os
import cowsay
cowsay.cow(os.getenv("secret_word"))
tool = create_tool_from_func(cowsay)
tool = ToolManager().create_or_update_tool(tool, test_user)
yield tool
@pytest.fixture
def get_env_tool(test_user):
def get_env() -> str:
"""
Simple function that returns the secret word env variable.
Returns:
str: The secret word
"""
import os
secret_word = os.getenv("secret_word")
print(secret_word)
return secret_word
tool = create_tool_from_func(get_env)
tool = ToolManager().create_or_update_tool(tool, test_user)
yield tool
@pytest.fixture
def get_warning_tool(test_user):
def warn_hello_world() -> str:
"""
Simple function that warns hello world.
Returns:
str: hello world
"""
import warnings
msg = "Hello World"
warnings.warn(msg)
return msg
tool = create_tool_from_func(warn_hello_world)
tool = ToolManager().create_or_update_tool(tool, test_user)
yield tool
@pytest.fixture
def always_err_tool(test_user):
def error() -> str:
"""
Simple function that errors
Returns:
str: not important
"""
# Raise a unusual error so we know it's from this function
print("Going to error now")
raise ZeroDivisionError("This is an intentionally weird division!")
tool = create_tool_from_func(error)
tool = ToolManager().create_or_update_tool(tool, test_user)
yield tool
@pytest.fixture
def list_tool(test_user):
def create_list():
"""Simple function that returns a list"""
return [1] * 5
tool = create_tool_from_func(create_list)
tool = ToolManager().create_or_update_tool(tool, test_user)
yield tool
@pytest.fixture
def clear_core_memory_tool(test_user):
def clear_memory(agent_state: "AgentState"):
"""Clear the core memory"""
agent_state.memory.get_block("human").value = ""
agent_state.memory.get_block("persona").value = ""
tool = create_tool_from_func(clear_memory)
tool = ToolManager().create_or_update_tool(tool, test_user)
yield tool
@pytest.fixture
def external_codebase_tool(test_user):
from tests.test_tool_sandbox.restaurant_management_system.adjust_menu_prices import adjust_menu_prices
tool = create_tool_from_func(adjust_menu_prices)
tool = ToolManager().create_or_update_tool(tool, test_user)
yield tool
@pytest.fixture
def agent_state():
client = create_client()
agent_state = client.create_agent(
memory=ChatMemory(persona="This is the persona", human="My name is Chad"),
embedding_config=EmbeddingConfig.default_config(provider="openai"),
llm_config=LLMConfig.default_config(model_name="gpt-4o-mini"),
)
agent_state.tool_rules = []
yield agent_state
@pytest.fixture
def custom_test_sandbox_config(test_user):
"""
Fixture to create a consistent local sandbox configuration for tests.
Args:
test_user: The test user to be used for creating the sandbox configuration.
Returns:
A tuple containing the SandboxConfigManager and the created sandbox configuration.
"""
# Create the SandboxConfigManager
manager = SandboxConfigManager()
# Set the sandbox to be within the external codebase path and use a venv
external_codebase_path = str(Path(__file__).parent / "test_tool_sandbox" / "restaurant_management_system")
# tqdm is used in this codebase, but NOT in the requirements.txt, this tests that we can successfully install pip requirements
local_sandbox_config = LocalSandboxConfig(
sandbox_dir=external_codebase_path, use_venv=True, pip_requirements=[PipRequirement(name="tqdm")]
)
# Create the sandbox configuration
config_create = SandboxConfigCreate(config=local_sandbox_config.model_dump())
# Create or update the sandbox configuration
manager.create_or_update_sandbox_config(sandbox_config_create=config_create, actor=test_user)
return manager, local_sandbox_config
# Tool-specific fixtures
@pytest.fixture
def core_memory_tools(test_user):
"""Create all base tools for testing."""
tools = {}
for func in [
core_memory_replace,
core_memory_append,
]:
tool = create_tool_from_func(func)
tool = ToolManager().create_or_update_tool(tool, test_user)
tools[func.__name__] = tool
yield tools
# Local sandbox tests
@pytest.mark.asyncio
@pytest.mark.local_sandbox
async def test_local_sandbox_default(mock_e2b_api_key_none, add_integers_tool, test_user):
args = {"x": 10, "y": 5}
# Mock and assert correct pathway was invoked
with patch.object(AsyncToolSandboxLocal, "run_local_dir_sandbox") as mock_run_local_dir_sandbox:
sandbox = AsyncToolSandboxLocal(add_integers_tool.name, args, user=test_user)
await sandbox.run()
mock_run_local_dir_sandbox.assert_called_once()
# Run again to get actual response
sandbox = AsyncToolSandboxLocal(add_integers_tool.name, args, user=test_user)
result = await sandbox.run()
assert result.func_return == args["x"] + args["y"]
@pytest.mark.asyncio
@pytest.mark.local_sandbox
async def test_local_sandbox_stateful_tool(mock_e2b_api_key_none, clear_core_memory_tool, test_user, agent_state):
args = {}
sandbox = AsyncToolSandboxLocal(clear_core_memory_tool.name, args, user=test_user)
result = await sandbox.run(agent_state=agent_state)
assert sandbox.inject_agent_state == True
assert result.agent_state.memory.get_block("human").value == ""
assert result.agent_state.memory.get_block("persona").value == ""
assert result.func_return is None
@pytest.mark.asyncio
@pytest.mark.local_sandbox
async def test_local_sandbox_with_list_rv(mock_e2b_api_key_none, list_tool, test_user):
sandbox = AsyncToolSandboxLocal(list_tool.name, {}, user=test_user)
result = await sandbox.run()
assert len(result.func_return) == 5
@pytest.mark.asyncio
@pytest.mark.local_sandbox
async def test_local_sandbox_env(mock_e2b_api_key_none, get_env_tool, test_user):
manager = SandboxConfigManager()
sandbox_dir = str(Path(__file__).parent / "test_tool_sandbox")
config_create = SandboxConfigCreate(config=LocalSandboxConfig(sandbox_dir=sandbox_dir).model_dump())
config = manager.create_or_update_sandbox_config(config_create, test_user)
key = "secret_word"
long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
manager.create_sandbox_env_var(
SandboxEnvironmentVariableCreate(key=key, value=long_random_string), sandbox_config_id=config.id, actor=test_user
)
sandbox = AsyncToolSandboxLocal(get_env_tool.name, {}, user=test_user)
result = await sandbox.run()
assert long_random_string in result.func_return
@pytest.mark.asyncio
@pytest.mark.local_sandbox
async def test_local_sandbox_per_agent_env(mock_e2b_api_key_none, get_env_tool, agent_state, test_user):
manager = SandboxConfigManager()
key = "secret_word"
sandbox_dir = str(Path(__file__).parent / "test_tool_sandbox")
config_create = SandboxConfigCreate(config=LocalSandboxConfig(sandbox_dir=sandbox_dir).model_dump())
config = manager.create_or_update_sandbox_config(config_create, test_user)
wrong_val = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
manager.create_sandbox_env_var(SandboxEnvironmentVariableCreate(key=key, value=wrong_val), sandbox_config_id=config.id, actor=test_user)
correct_val = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
agent_state.tool_exec_environment_variables = [AgentEnvironmentVariable(key=key, value=correct_val, agent_id=agent_state.id)]
sandbox = AsyncToolSandboxLocal(get_env_tool.name, {}, user=test_user)
result = await sandbox.run(agent_state=agent_state)
assert wrong_val not in result.func_return
assert correct_val in result.func_return
@pytest.mark.asyncio
@pytest.mark.local_sandbox
async def test_local_sandbox_external_codebase_with_venv(
mock_e2b_api_key_none, custom_test_sandbox_config, external_codebase_tool, test_user
):
args = {"percentage": 10}
sandbox = AsyncToolSandboxLocal(external_codebase_tool.name, args, user=test_user)
result = await sandbox.run()
assert result.func_return == "Price Adjustments:\nBurger: $8.99 -> $9.89\nFries: $2.99 -> $3.29\nSoda: $1.99 -> $2.19"
assert "Hello World" in result.stdout[0]
@pytest.mark.asyncio
@pytest.mark.local_sandbox
async def test_local_sandbox_with_venv_and_warnings_does_not_error(
mock_e2b_api_key_none, custom_test_sandbox_config, get_warning_tool, test_user
):
sandbox = AsyncToolSandboxLocal(get_warning_tool.name, {}, user=test_user)
result = await sandbox.run()
assert result.func_return == "Hello World"
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_local_sandbox_with_venv_errors(mock_e2b_api_key_none, custom_test_sandbox_config, always_err_tool, test_user):
sandbox = AsyncToolSandboxLocal(always_err_tool.name, {}, user=test_user)
result = await sandbox.run()
assert len(result.stdout) != 0
assert "error" in result.stdout[0]
assert len(result.stderr) != 0
assert "ZeroDivisionError: This is an intentionally weird division!" in result.stderr[0]
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_local_sandbox_with_venv_pip_installs_basic(mock_e2b_api_key_none, cowsay_tool, test_user):
manager = SandboxConfigManager()
config_create = SandboxConfigCreate(
config=LocalSandboxConfig(use_venv=True, pip_requirements=[PipRequirement(name="cowsay")]).model_dump()
)
config = manager.create_or_update_sandbox_config(config_create, test_user)
key = "secret_word"
long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
manager.create_sandbox_env_var(
SandboxEnvironmentVariableCreate(key=key, value=long_random_string), sandbox_config_id=config.id, actor=test_user
)
sandbox = AsyncToolSandboxLocal(cowsay_tool.name, {}, user=test_user, force_recreate_venv=True)
result = await sandbox.run()
assert long_random_string in result.stdout[0]
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_local_sandbox_with_venv_pip_installs_with_update(mock_e2b_api_key_none, cowsay_tool, test_user):
manager = SandboxConfigManager()
config_create = SandboxConfigCreate(config=LocalSandboxConfig(use_venv=True).model_dump())
config = manager.create_or_update_sandbox_config(config_create, test_user)
key = "secret_word"
long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
manager.create_sandbox_env_var(
SandboxEnvironmentVariableCreate(key=key, value=long_random_string), sandbox_config_id=config.id, actor=test_user
)
sandbox = AsyncToolSandboxLocal(cowsay_tool.name, {}, user=test_user, force_recreate_venv=True)
result = await sandbox.run()
assert len(result.stdout) == 0
assert "No module named 'cowsay'" in result.stderr[0]
config_create = SandboxConfigCreate(
config=LocalSandboxConfig(use_venv=True, pip_requirements=[PipRequirement(name="cowsay")]).model_dump()
)
manager.create_or_update_sandbox_config(config_create, test_user)
sandbox = AsyncToolSandboxLocal(cowsay_tool.name, {}, user=test_user, force_recreate_venv=False)
result = await sandbox.run()
assert long_random_string in result.stdout[0]
# E2B sandbox tests
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_e2b_sandbox_default(check_e2b_key_is_set, add_integers_tool, test_user):
args = {"x": 10, "y": 5}
# Mock and assert correct pathway was invoked
with patch.object(AsyncToolSandboxE2B, "run_e2b_sandbox") as mock_run_local_dir_sandbox:
sandbox = AsyncToolSandboxE2B(add_integers_tool.name, args, user=test_user)
await sandbox.run()
mock_run_local_dir_sandbox.assert_called_once()
# Run again to get actual response
sandbox = AsyncToolSandboxE2B(add_integers_tool.name, args, user=test_user)
result = await sandbox.run()
assert int(result.func_return) == args["x"] + args["y"]
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_e2b_sandbox_pip_installs(check_e2b_key_is_set, cowsay_tool, test_user):
manager = SandboxConfigManager()
config_create = SandboxConfigCreate(config=E2BSandboxConfig(pip_requirements=["cowsay"]).model_dump())
config = manager.create_or_update_sandbox_config(config_create, test_user)
key = "secret_word"
long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
manager.create_sandbox_env_var(
SandboxEnvironmentVariableCreate(key=key, value=long_random_string),
sandbox_config_id=config.id,
actor=test_user,
)
sandbox = AsyncToolSandboxE2B(cowsay_tool.name, {}, user=test_user)
result = await sandbox.run()
assert long_random_string in result.stdout[0]
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_e2b_sandbox_stateful_tool(check_e2b_key_is_set, clear_core_memory_tool, test_user, agent_state):
sandbox = AsyncToolSandboxE2B(clear_core_memory_tool.name, {}, user=test_user)
result = await sandbox.run(agent_state=agent_state)
assert result.agent_state.memory.get_block("human").value == ""
assert result.agent_state.memory.get_block("persona").value == ""
assert result.func_return is None
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_e2b_sandbox_inject_env_var_existing_sandbox(check_e2b_key_is_set, get_env_tool, test_user):
manager = SandboxConfigManager()
config_create = SandboxConfigCreate(config=E2BSandboxConfig().model_dump())
config = manager.create_or_update_sandbox_config(config_create, test_user)
sandbox = AsyncToolSandboxE2B(get_env_tool.name, {}, user=test_user)
result = await sandbox.run()
assert result.func_return is None
key = "secret_word"
long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
manager.create_sandbox_env_var(
SandboxEnvironmentVariableCreate(key=key, value=long_random_string),
sandbox_config_id=config.id,
actor=test_user,
)
sandbox = AsyncToolSandboxE2B(get_env_tool.name, {}, user=test_user)
result = await sandbox.run()
assert long_random_string in result.func_return
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_e2b_sandbox_per_agent_env(check_e2b_key_is_set, get_env_tool, agent_state, test_user):
manager = SandboxConfigManager()
key = "secret_word"
wrong_val = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
correct_val = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20))
config_create = SandboxConfigCreate(config=LocalSandboxConfig().model_dump())
config = manager.create_or_update_sandbox_config(config_create, test_user)
manager.create_sandbox_env_var(
SandboxEnvironmentVariableCreate(key=key, value=wrong_val),
sandbox_config_id=config.id,
actor=test_user,
)
agent_state.tool_exec_environment_variables = [AgentEnvironmentVariable(key=key, value=correct_val, agent_id=agent_state.id)]
sandbox = AsyncToolSandboxE2B(get_env_tool.name, {}, user=test_user)
result = await sandbox.run(agent_state=agent_state)
assert wrong_val not in result.func_return
assert correct_val in result.func_return
@pytest.mark.asyncio
@pytest.mark.e2b_sandbox
async def test_e2b_sandbox_with_list_rv(check_e2b_key_is_set, list_tool, test_user):
sandbox = AsyncToolSandboxE2B(list_tool.name, {}, user=test_user)
result = await sandbox.run()
assert len(result.func_return) == 5