From 25b106d1cd4351ec6e6b4f20e507c346cbfbf735 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Tue, 1 Apr 2025 14:28:24 -0700 Subject: [PATCH] feat: Support natively async e2b (#1512) --- .../async_tool_execution_sandbox.py | 397 -------------- .../tool_executor/tool_execution_sandbox.py | 4 +- letta/services/tool_executor/tool_executor.py | 28 +- letta/services/tool_sandbox/__init__.py | 0 letta/services/tool_sandbox/base.py | 190 +++++++ letta/services/tool_sandbox/e2b_sandbox.py | 116 ++++ letta/services/tool_sandbox/local_sandbox.py | 221 ++++++++ tests/integration_test_async_tool_sandbox.py | 515 ++++++++++++++++++ 8 files changed, 1051 insertions(+), 420 deletions(-) delete mode 100644 letta/services/tool_executor/async_tool_execution_sandbox.py create mode 100644 letta/services/tool_sandbox/__init__.py create mode 100644 letta/services/tool_sandbox/base.py create mode 100644 letta/services/tool_sandbox/e2b_sandbox.py create mode 100644 letta/services/tool_sandbox/local_sandbox.py create mode 100644 tests/integration_test_async_tool_sandbox.py diff --git a/letta/services/tool_executor/async_tool_execution_sandbox.py b/letta/services/tool_executor/async_tool_execution_sandbox.py deleted file mode 100644 index 6b957a97..00000000 --- a/letta/services/tool_executor/async_tool_execution_sandbox.py +++ /dev/null @@ -1,397 +0,0 @@ -import ast -import asyncio -import base64 -import os -import pickle -import sys -import tempfile -import uuid -from typing import Any, Dict, Optional, Tuple - -from letta.functions.helpers import generate_model_from_args_json_schema -from letta.schemas.agent import AgentState -from letta.schemas.sandbox_config import SandboxRunResult, SandboxType -from letta.services.helpers.tool_execution_helper import ( - add_imports_and_pydantic_schemas_for_args, - create_venv_for_local_sandbox, - find_python_executable, - install_pip_requirements_for_sandbox, -) -from letta.services.organization_manager import OrganizationManager -from letta.services.sandbox_config_manager import SandboxConfigManager -from letta.services.tool_manager import ToolManager -from letta.tracing import log_event, trace_method -from letta.utils import get_friendly_error_msg - - -class AsyncToolExecutionSandbox: - METADATA_CONFIG_STATE_KEY = "config_state" - REQUIREMENT_TXT_NAME = "requirements.txt" - - # For generating long, random marker hashes - NAMESPACE = uuid.NAMESPACE_DNS - LOCAL_SANDBOX_RESULT_START_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-start-marker")) - LOCAL_SANDBOX_RESULT_END_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-end-marker")) - - # This is the variable name in the auto-generated code that contains the function results - # We make this a long random string to avoid collisions with any variables in the user's code - LOCAL_SANDBOX_RESULT_VAR_NAME = "result_ZQqiequkcFwRwwGQMqkt" - - def __init__(self, tool_name: str, args: dict, user, force_recreate=True, force_recreate_venv=False, tool_object=None): - self.tool_name = tool_name - self.args = args - self.user = user - # get organization - self.organization = OrganizationManager().get_organization_by_id(self.user.organization_id) - self.privileged_tools = self.organization.privileged_tools - - # If a tool object is provided, we use it directly, otherwise pull via name - if tool_object is not None: - self.tool = tool_object - else: - # Get the tool via name - self.tool = ToolManager().get_tool_by_name(tool_name=tool_name, actor=self.user) - if not self.tool: - raise ValueError( - f"Agent attempted to invoke tool {self.tool_name} that does not exist for organization {self.user.organization_id}" - ) - - self.sandbox_config_manager = SandboxConfigManager() - self.force_recreate = force_recreate - self.force_recreate_venv = force_recreate_venv - - async def run( - self, agent_state: Optional[AgentState] = None, additional_env_vars: Optional[Dict] = None, inject_agent_state: bool = False - ) -> SandboxRunResult: - """ - Run the tool in a sandbox environment asynchronously, - *always* using a subprocess for execution. - """ - result = await self.run_local_dir_sandbox( - agent_state=agent_state, additional_env_vars=additional_env_vars, inject_agent_state=inject_agent_state - ) - - # Simple console logging for demonstration - for log_line in (result.stdout or []) + (result.stderr or []): - print(f"Tool execution log: {log_line}") - - return result - - @trace_method - async def run_local_dir_sandbox( - self, agent_state: Optional[AgentState], additional_env_vars: Optional[Dict], inject_agent_state: bool - ) -> SandboxRunResult: - """ - Unified asynchronougit pus method to run the tool in a local sandbox environment, - always via subprocess for multi-core parallelism. - """ - # Get sandbox configuration - sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.LOCAL, actor=self.user) - local_configs = sbx_config.get_local_config() - use_venv = local_configs.use_venv - - # Prepare environment variables - env = os.environ.copy() - env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100) - env.update(env_vars) - - if agent_state: - env.update(agent_state.get_agent_env_vars_as_dict()) - - if additional_env_vars: - env.update(additional_env_vars) - - # Make sure sandbox directory exists - sandbox_dir = os.path.expanduser(local_configs.sandbox_dir) - if not os.path.exists(sandbox_dir) or not os.path.isdir(sandbox_dir): - os.makedirs(sandbox_dir) - - # If using a virtual environment, ensure it's prepared in parallel - venv_preparation_task = None - if use_venv: - venv_path = str(os.path.join(sandbox_dir, local_configs.venv_name)) - if self.force_recreate_venv or not os.path.isdir(venv_path): - venv_preparation_task = asyncio.create_task(self._prepare_venv(local_configs, venv_path, env)) - - # Generate and write execution script (always with markers, since we rely on stdout) - with tempfile.NamedTemporaryFile(mode="w", dir=sandbox_dir, suffix=".py", delete=False) as temp_file: - code = self.generate_execution_script(agent_state=agent_state, inject_agent_state=inject_agent_state) - temp_file.write(code) - temp_file.flush() - temp_file_path = temp_file.name - - try: - # If we started a venv preparation task, wait for it to complete - if venv_preparation_task: - await venv_preparation_task - - # Determine the python executable and environment for the subprocess - exec_env = env.copy() - if use_venv: - venv_path = str(os.path.join(sandbox_dir, local_configs.venv_name)) - python_executable = find_python_executable(local_configs) - exec_env["VIRTUAL_ENV"] = venv_path - exec_env["PATH"] = os.path.join(venv_path, "bin") + ":" + exec_env["PATH"] - else: - # If not using venv, use whatever Python we are running on - python_executable = sys.executable - - exec_env["PYTHONWARNINGS"] = "ignore" - - # Execute in subprocess - return await self._execute_tool_subprocess( - sbx_config=sbx_config, - python_executable=python_executable, - temp_file_path=temp_file_path, - env=exec_env, - cwd=sandbox_dir, - ) - - except Exception as e: - print(f"Executing tool {self.tool_name} has an unexpected error: {e}") - print(f"Auto-generated code for debugging:\n\n{code}") - raise e - finally: - # Clean up the temp file - os.remove(temp_file_path) - - async def _prepare_venv(self, local_configs, venv_path: str, env: Dict[str, str]): - """ - Prepare virtual environment asynchronously (in a background thread). - """ - sandbox_dir = os.path.expanduser(local_configs.sandbox_dir) - log_event(name="start create_venv_for_local_sandbox", attributes={"venv_path": venv_path}) - - await asyncio.to_thread( - create_venv_for_local_sandbox, - sandbox_dir_path=sandbox_dir, - venv_path=venv_path, - env=env, - force_recreate=self.force_recreate_venv, - ) - log_event(name="finish create_venv_for_local_sandbox") - - log_event(name="start install_pip_requirements_for_sandbox", attributes={"local_configs": local_configs.model_dump_json()}) - await asyncio.to_thread(install_pip_requirements_for_sandbox, local_configs, upgrade=True, user_install_if_no_venv=False, env=env) - log_event(name="finish install_pip_requirements_for_sandbox", attributes={"local_configs": local_configs.model_dump_json()}) - - @trace_method - async def _execute_tool_subprocess( - self, sbx_config, python_executable: str, temp_file_path: str, env: Dict[str, str], cwd: str - ) -> SandboxRunResult: - """ - Execute user code in a subprocess, always capturing stdout and stderr. - We parse special markers to extract the pickled result string. - """ - try: - log_event(name="start subprocess") - - process = await asyncio.create_subprocess_exec( - python_executable, temp_file_path, env=env, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - - try: - stdout_bytes, stderr_bytes = await asyncio.wait_for(process.communicate(), timeout=60) - except asyncio.TimeoutError: - # Terminate the process on timeout - if process.returncode is None: - process.terminate() - try: - await asyncio.wait_for(process.wait(), timeout=5) - except asyncio.TimeoutError: - process.kill() - - raise TimeoutError(f"Executing tool {self.tool_name} timed out after 60 seconds.") - - stdout = stdout_bytes.decode("utf-8") if stdout_bytes else "" - stderr = stderr_bytes.decode("utf-8") if stderr_bytes else "" - log_event(name="finish subprocess") - - # Parse markers to isolate the function result - func_result, stdout_text = self.parse_out_function_results_markers(stdout) - func_return, agent_state = self.parse_best_effort(func_result) - - return SandboxRunResult( - func_return=func_return, - agent_state=agent_state, - stdout=[stdout_text] if stdout_text else [], - stderr=[stderr] if stderr else [], - status="success" if process.returncode == 0 else "error", - sandbox_config_fingerprint=sbx_config.fingerprint(), - ) - - except (TimeoutError, Exception) as e: - # Distinguish between timeouts and other exceptions for clarity - if isinstance(e, TimeoutError): - raise e - - print(f"Subprocess execution for tool {self.tool_name} encountered an error: {e}") - func_return = get_friendly_error_msg( - function_name=self.tool_name, - exception_name=type(e).__name__, - exception_message=str(e), - ) - return SandboxRunResult( - func_return=func_return, - agent_state=None, - stdout=[], - stderr=[str(e)], - status="error", - sandbox_config_fingerprint=sbx_config.fingerprint(), - ) - - def parse_out_function_results_markers(self, text: str) -> Tuple[str, str]: - """ - Parse the function results out of the stdout using special markers. - Returns (function_result_str, stripped_stdout). - """ - if self.LOCAL_SANDBOX_RESULT_START_MARKER not in text: - # No markers found, so nothing to parse - return "", text - - marker_len = len(self.LOCAL_SANDBOX_RESULT_START_MARKER) - start_index = text.index(self.LOCAL_SANDBOX_RESULT_START_MARKER) + marker_len - end_index = text.index(self.LOCAL_SANDBOX_RESULT_END_MARKER) - - # The actual pickled base64 is between start_index and end_index - results_str = text[start_index:end_index] - # The rest of stdout (minus the markers) - remainder = text[: start_index - marker_len] + text[end_index + marker_len :] - return results_str, remainder - - def parse_best_effort(self, text: str) -> Tuple[Any, Optional[AgentState]]: - """ - Decode and unpickle the result from the function execution if possible. - Returns (function_return_value, agent_state). - """ - if not text: - return None, None - - result = pickle.loads(base64.b64decode(text)) - agent_state = result["agent_state"] if result["agent_state"] is not None else None - return result["results"], agent_state - - def parse_function_arguments(self, source_code: str, tool_name: str) -> list: - """ - Get arguments of the given function from its source code via AST. - """ - tree = ast.parse(source_code) - args = [] - for node in ast.walk(tree): - if isinstance(node, ast.FunctionDef) and node.name == tool_name: - for arg in node.args.args: - args.append(arg.arg) - return args - - def generate_execution_script(self, agent_state: Optional[AgentState], inject_agent_state: bool) -> str: - """ - Generate code to run inside of execution sandbox. - Serialize the agent state and arguments, call the tool, - then base64-encode/pickle the result. - """ - code = "from typing import *\n" - code += "import pickle\n" - code += "import sys\n" - code += "import base64\n" - - # Additional imports to support agent state - if inject_agent_state: - code += "import letta\n" - code += "from letta import * \n" - - # Add schema code if available - if self.tool.args_json_schema: - schema_code = add_imports_and_pydantic_schemas_for_args(self.tool.args_json_schema) - if "from __future__ import annotations" in schema_code: - schema_code = schema_code.replace("from __future__ import annotations", "").lstrip() - code = "from __future__ import annotations\n\n" + code - code += schema_code + "\n" - - # Load the agent state - if inject_agent_state: - agent_state_pickle = pickle.dumps(agent_state) - code += f"agent_state = pickle.loads({agent_state_pickle})\n" - else: - code += "agent_state = None\n" - - # Initialize arguments - if self.tool.args_json_schema: - args_schema = generate_model_from_args_json_schema(self.tool.args_json_schema) - code += f"args_object = {args_schema.__name__}(**{self.args})\n" - for param in self.args: - code += f"{param} = args_object.{param}\n" - else: - for param in self.args: - code += self.initialize_param(param, self.args[param]) - - # Insert the tool's source code - code += "\n" + self.tool.source_code + "\n" - - # Invoke the function and store the result in a global variable - code += ( - f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME}" - + ' = {"results": ' - + self.invoke_function_call(inject_agent_state=inject_agent_state) - + ', "agent_state": agent_state}\n' - ) - code += ( - f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME} = base64.b64encode(" - f"pickle.dumps({self.LOCAL_SANDBOX_RESULT_VAR_NAME})" - ").decode('utf-8')\n" - ) - - # If we're always in a subprocess, we must rely on markers to parse out the result - code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_START_MARKER}')\n" - code += f"sys.stdout.write(str({self.LOCAL_SANDBOX_RESULT_VAR_NAME}))\n" - code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_END_MARKER}')\n" - - return code - - def _convert_param_to_value(self, param_type: str, raw_value: str) -> str: - """ - Convert parameter to Python code representation based on JSON schema type. - """ - if param_type == "string": - # Safely inject a Python string via pickle - value = "pickle.loads(" + str(pickle.dumps(raw_value)) + ")" - elif param_type in ["integer", "boolean", "number", "array", "object"]: - # This is simplistic. In real usage, ensure correct type-casting or sanitization. - value = raw_value - else: - raise TypeError(f"Unsupported type: {param_type}, raw_value={raw_value}") - - return str(value) - - def initialize_param(self, name: str, raw_value: str) -> str: - """ - Produce code for initializing a single parameter in the generated script. - """ - params = self.tool.json_schema["parameters"]["properties"] - spec = params.get(name) - if spec is None: - # Possibly an extra param like 'self' that we ignore - return "" - - param_type = spec.get("type") - if param_type is None and spec.get("parameters"): - param_type = spec["parameters"].get("type") - - value = self._convert_param_to_value(param_type, raw_value) - return f"{name} = {value}\n" - - def invoke_function_call(self, inject_agent_state: bool) -> str: - """ - Generate the function call code string with the appropriate arguments. - """ - kwargs = [] - for name in self.args: - if name in self.tool.json_schema["parameters"]["properties"]: - kwargs.append(name) - - param_list = [f"{arg}={arg}" for arg in kwargs] - if inject_agent_state: - param_list.append("agent_state=agent_state") - - params = ", ".join(param_list) - func_call_str = self.tool.name + "(" + params + ")" - return func_call_str diff --git a/letta/services/tool_executor/tool_execution_sandbox.py b/letta/services/tool_executor/tool_execution_sandbox.py index 9b285d48..4a82fc13 100644 --- a/letta/services/tool_executor/tool_execution_sandbox.py +++ b/letta/services/tool_executor/tool_execution_sandbox.py @@ -387,9 +387,7 @@ class ToolExecutionSandbox: sbx = Sandbox(sandbox_config.get_e2b_config().template, metadata={self.METADATA_CONFIG_STATE_KEY: state_hash}) else: # no template - sbx = Sandbox( - metadata={self.METADATA_CONFIG_STATE_KEY: state_hash}, **e2b_config.model_dump(to_orm=True, exclude={"pip_requirements"}) - ) + sbx = Sandbox(metadata={self.METADATA_CONFIG_STATE_KEY: state_hash}, **e2b_config.model_dump(exclude={"pip_requirements"})) # install pip requirements if e2b_config.pip_requirements: diff --git a/letta/services/tool_executor/tool_executor.py b/letta/services/tool_executor/tool_executor.py index 69345b00..d4b8f444 100644 --- a/letta/services/tool_executor/tool_executor.py +++ b/letta/services/tool_executor/tool_executor.py @@ -1,4 +1,3 @@ -import ast import math from abc import ABC, abstractmethod from typing import Any, Optional, Tuple @@ -15,7 +14,9 @@ from letta.schemas.user import User from letta.services.agent_manager import AgentManager from letta.services.message_manager import MessageManager from letta.services.passage_manager import PassageManager -from letta.services.tool_executor.async_tool_execution_sandbox import AsyncToolExecutionSandbox +from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B +from letta.services.tool_sandbox.local_sandbox import AsyncToolSandboxLocal +from letta.settings import tool_settings from letta.utils import get_friendly_error_msg @@ -334,16 +335,13 @@ class SandboxToolExecutor(ToolExecutor): agent_state_copy = self._create_agent_state_copy(agent_state) - # TODO: This is brittle, think about better way to do this? - if "agent_state" in self.parse_function_arguments(tool.source_code, tool.name): - inject_agent_state = True + # Execute in sandbox depending on API key + if tool_settings.e2b_api_key: + sandbox = AsyncToolSandboxE2B(function_name, function_args, actor, tool_object=tool) else: - inject_agent_state = False + sandbox = AsyncToolSandboxLocal(function_name, function_args, actor, tool_object=tool) - # Execute in sandbox - sandbox_run_result = await AsyncToolExecutionSandbox(function_name, function_args, actor, tool_object=tool).run( - agent_state=agent_state_copy, inject_agent_state=inject_agent_state - ) + sandbox_run_result = await sandbox.run(agent_state=agent_state_copy) function_response, updated_agent_state = sandbox_run_result.func_return, sandbox_run_result.agent_state @@ -371,16 +369,6 @@ class SandboxToolExecutor(ToolExecutor): # This is defensive programming - we try to coerce but fall back if it fails return function_args - def parse_function_arguments(self, source_code: str, tool_name: str): - """Get arguments of a function from its source code""" - tree = ast.parse(source_code) - args = [] - for node in ast.walk(tree): - if isinstance(node, ast.FunctionDef) and node.name == tool_name: - for arg in node.args.args: - args.append(arg.arg) - return args - def _create_agent_state_copy(self, agent_state: AgentState): """Create a copy of agent state for sandbox execution.""" agent_state_copy = agent_state.__deepcopy__() diff --git a/letta/services/tool_sandbox/__init__.py b/letta/services/tool_sandbox/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/letta/services/tool_sandbox/base.py b/letta/services/tool_sandbox/base.py new file mode 100644 index 00000000..918a616b --- /dev/null +++ b/letta/services/tool_sandbox/base.py @@ -0,0 +1,190 @@ +import ast +import base64 +import pickle +import uuid +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional, Tuple + +from letta.functions.helpers import generate_model_from_args_json_schema +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxRunResult +from letta.services.helpers.tool_execution_helper import add_imports_and_pydantic_schemas_for_args +from letta.services.organization_manager import OrganizationManager +from letta.services.sandbox_config_manager import SandboxConfigManager +from letta.services.tool_manager import ToolManager + + +class AsyncToolSandboxBase(ABC): + NAMESPACE = uuid.NAMESPACE_DNS + LOCAL_SANDBOX_RESULT_START_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-start-marker")) + LOCAL_SANDBOX_RESULT_END_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-end-marker")) + LOCAL_SANDBOX_RESULT_VAR_NAME = "result_ZQqiequkcFwRwwGQMqkt" + + def __init__(self, tool_name: str, args: dict, user, tool_object=None): + self.tool_name = tool_name + self.args = args + self.user = user + self.organization = OrganizationManager().get_organization_by_id(self.user.organization_id) + self.privileged_tools = self.organization.privileged_tools + + self.tool = tool_object or ToolManager().get_tool_by_name(tool_name=tool_name, actor=self.user) + if self.tool is None: + raise ValueError( + f"Agent attempted to invoke tool {self.tool_name} that does not exist for organization {self.user.organization_id}" + ) + + self.sandbox_config_manager = SandboxConfigManager() + + # See if we should inject agent_state or not based on the presence of the "agent_state" arg + if "agent_state" in self.parse_function_arguments(self.tool.source_code, self.tool.name): + self.inject_agent_state = True + else: + self.inject_agent_state = False + + @abstractmethod + async def run( + self, + agent_state: Optional[AgentState] = None, + additional_env_vars: Optional[Dict] = None, + ) -> SandboxRunResult: + """ + Run the tool in a sandbox environment asynchronously. + Must be implemented by subclasses. + """ + raise NotImplementedError + + def generate_execution_script(self, agent_state: Optional[AgentState], wrap_print_with_markers: bool = False) -> str: + """ + Generate code to run inside of execution sandbox. + Serialize the agent state and arguments, call the tool, + then base64-encode/pickle the result. + """ + code = "from typing import *\n" + code += "import pickle\n" + code += "import sys\n" + code += "import base64\n" + + # Additional imports to support agent state + if self.inject_agent_state: + code += "import letta\n" + code += "from letta import * \n" + + # Add schema code if available + if self.tool.args_json_schema: + schema_code = add_imports_and_pydantic_schemas_for_args(self.tool.args_json_schema) + if "from __future__ import annotations" in schema_code: + schema_code = schema_code.replace("from __future__ import annotations", "").lstrip() + code = "from __future__ import annotations\n\n" + code + code += schema_code + "\n" + + # Load the agent state + if self.inject_agent_state: + agent_state_pickle = pickle.dumps(agent_state) + code += f"agent_state = pickle.loads({agent_state_pickle})\n" + else: + code += "agent_state = None\n" + + # Initialize arguments + if self.tool.args_json_schema: + args_schema = generate_model_from_args_json_schema(self.tool.args_json_schema) + code += f"args_object = {args_schema.__name__}(**{self.args})\n" + for param in self.args: + code += f"{param} = args_object.{param}\n" + else: + for param in self.args: + code += self.initialize_param(param, self.args[param]) + + # Insert the tool's source code + code += "\n" + self.tool.source_code + "\n" + + # Invoke the function and store the result in a global variable + code += ( + f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME}" + ' = {"results": ' + self.invoke_function_call() + ', "agent_state": agent_state}\n' + ) + code += ( + f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME} = base64.b64encode(" + f"pickle.dumps({self.LOCAL_SANDBOX_RESULT_VAR_NAME})" + ").decode('utf-8')\n" + ) + + if wrap_print_with_markers: + code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_START_MARKER}')\n" + code += f"sys.stdout.write(str({self.LOCAL_SANDBOX_RESULT_VAR_NAME}))\n" + code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_END_MARKER}')\n" + else: + code += f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME}\n" + + return code + + def _convert_param_to_value(self, param_type: str, raw_value: str) -> str: + """ + Convert parameter to Python code representation based on JSON schema type. + """ + if param_type == "string": + # Safely inject a Python string via pickle + value = "pickle.loads(" + str(pickle.dumps(raw_value)) + ")" + elif param_type in ["integer", "boolean", "number", "array", "object"]: + # This is simplistic. In real usage, ensure correct type-casting or sanitization. + value = raw_value + else: + raise TypeError(f"Unsupported type: {param_type}, raw_value={raw_value}") + + return str(value) + + def initialize_param(self, name: str, raw_value: str) -> str: + """ + Produce code for initializing a single parameter in the generated script. + """ + params = self.tool.json_schema["parameters"]["properties"] + spec = params.get(name) + if spec is None: + # Possibly an extra param like 'self' that we ignore + return "" + + param_type = spec.get("type") + if param_type is None and spec.get("parameters"): + param_type = spec["parameters"].get("type") + + value = self._convert_param_to_value(param_type, raw_value) + return f"{name} = {value}\n" + + def invoke_function_call(self) -> str: + """ + Generate the function call code string with the appropriate arguments. + """ + kwargs = [] + for name in self.args: + if name in self.tool.json_schema["parameters"]["properties"]: + kwargs.append(name) + + param_list = [f"{arg}={arg}" for arg in kwargs] + if self.inject_agent_state: + param_list.append("agent_state=agent_state") + + params = ", ".join(param_list) + func_call_str = self.tool.name + "(" + params + ")" + return func_call_str + + def parse_best_effort(self, text: str) -> Tuple[Any, Optional[AgentState]]: + """ + Decode and unpickle the result from the function execution if possible. + Returns (function_return_value, agent_state). + """ + if not text: + return None, None + + result = pickle.loads(base64.b64decode(text)) + print("LOOK HERE!") + print(result) + agent_state = result["agent_state"] + return result["results"], agent_state + + def parse_function_arguments(self, source_code: str, tool_name: str): + """Get arguments of a function from its source code""" + tree = ast.parse(source_code) + args = [] + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name == tool_name: + for arg in node.args.args: + args.append(arg.arg) + return args diff --git a/letta/services/tool_sandbox/e2b_sandbox.py b/letta/services/tool_sandbox/e2b_sandbox.py new file mode 100644 index 00000000..59d545a3 --- /dev/null +++ b/letta/services/tool_sandbox/e2b_sandbox.py @@ -0,0 +1,116 @@ +from typing import Dict, Optional + +from letta.log import get_logger +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxConfig, SandboxRunResult, SandboxType +from letta.services.tool_sandbox.base import AsyncToolSandboxBase +from letta.utils import get_friendly_error_msg + +logger = get_logger(__name__) + + +class AsyncToolSandboxE2B(AsyncToolSandboxBase): + METADATA_CONFIG_STATE_KEY = "config_state" + + def __init__(self, tool_name: str, args: dict, user, force_recreate=True, tool_object=None): + super().__init__(tool_name, args, user, tool_object) + self.force_recreate = force_recreate + + async def run( + self, + agent_state: Optional[AgentState] = None, + additional_env_vars: Optional[Dict] = None, + ) -> SandboxRunResult: + """ + Run the tool in a sandbox environment asynchronously, + *always* using a subprocess for execution. + """ + result = await self.run_e2b_sandbox(agent_state=agent_state, additional_env_vars=additional_env_vars) + + # Simple console logging for demonstration + for log_line in (result.stdout or []) + (result.stderr or []): + print(f"Tool execution log: {log_line}") + + return result + + async def run_e2b_sandbox( + self, agent_state: Optional[AgentState] = None, additional_env_vars: Optional[Dict] = None + ) -> SandboxRunResult: + sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.E2B, actor=self.user) + # TODO: So this defaults to force recreating always + # TODO: Eventually, provision one sandbox PER agent, and that agent re-uses that one specifically + e2b_sandbox = await self.create_e2b_sandbox_with_metadata_hash(sandbox_config=sbx_config) + + logger.info(f"E2B Sandbox configurations: {sbx_config}") + logger.info(f"E2B Sandbox ID: {e2b_sandbox.sandbox_id}") + + # TODO: This only makes sense if we re-use sandboxes + # # Since this sandbox was used, we extend its lifecycle by the timeout + # await sbx.set_timeout(sbx_config.get_e2b_config().timeout) + + # Get environment variables for the sandbox + # TODO: We set limit to 100 here, but maybe we want it uncapped? Realistically this should be fine. + env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100) + # Get environment variables for this agent specifically + if agent_state: + env_vars.update(agent_state.get_agent_env_vars_as_dict()) + + # Finally, get any that are passed explicitly into the `run` function call + if additional_env_vars: + env_vars.update(additional_env_vars) + code = self.generate_execution_script(agent_state=agent_state) + + execution = await e2b_sandbox.run_code(code, envs=env_vars) + + if execution.results: + func_return, agent_state = self.parse_best_effort(execution.results[0].text) + elif execution.error: + logger.error(f"Executing tool {self.tool_name} raised a {execution.error.name} with message: \n{execution.error.value}") + logger.error(f"Traceback from e2b sandbox: \n{execution.error.traceback}") + func_return = get_friendly_error_msg( + function_name=self.tool_name, exception_name=execution.error.name, exception_message=execution.error.value + ) + execution.logs.stderr.append(execution.error.traceback) + else: + raise ValueError(f"Tool {self.tool_name} returned execution with None") + + return SandboxRunResult( + func_return=func_return, + agent_state=agent_state, + stdout=execution.logs.stdout, + stderr=execution.logs.stderr, + status="error" if execution.error else "success", + sandbox_config_fingerprint=sbx_config.fingerprint(), + ) + + def parse_exception_from_e2b_execution(self, e2b_execution: "Execution") -> Exception: + builtins_dict = __builtins__ if isinstance(__builtins__, dict) else vars(__builtins__) + # Dynamically fetch the exception class from builtins, defaulting to Exception if not found + exception_class = builtins_dict.get(e2b_execution.error.name, Exception) + return exception_class(e2b_execution.error.value) + + async def create_e2b_sandbox_with_metadata_hash(self, sandbox_config: SandboxConfig) -> "Sandbox": + from e2b_code_interpreter import AsyncSandbox + + state_hash = sandbox_config.fingerprint() + e2b_config = sandbox_config.get_e2b_config() + + if e2b_config.template: + sbx = await AsyncSandbox.create(sandbox_config.get_e2b_config().template, metadata={self.METADATA_CONFIG_STATE_KEY: state_hash}) + else: + # no template + sbx = await AsyncSandbox.create( + metadata={self.METADATA_CONFIG_STATE_KEY: state_hash}, **e2b_config.model_dump(exclude={"pip_requirements"}) + ) + + # install pip requirements + if e2b_config.pip_requirements: + for package in e2b_config.pip_requirements: + await sbx.commands.run(f"pip install {package}") + return sbx + + async def list_running_e2b_sandboxes(self): + from e2b_code_interpreter import AsyncSandbox + + # List running sandboxes and access metadata. + return await AsyncSandbox.list() diff --git a/letta/services/tool_sandbox/local_sandbox.py b/letta/services/tool_sandbox/local_sandbox.py new file mode 100644 index 00000000..352f6de6 --- /dev/null +++ b/letta/services/tool_sandbox/local_sandbox.py @@ -0,0 +1,221 @@ +import asyncio +import os +import sys +import tempfile +from typing import Dict, Optional, Tuple + +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxRunResult, SandboxType +from letta.services.helpers.tool_execution_helper import ( + create_venv_for_local_sandbox, + find_python_executable, + install_pip_requirements_for_sandbox, +) +from letta.services.tool_sandbox.base import AsyncToolSandboxBase +from letta.tracing import log_event, trace_method +from letta.utils import get_friendly_error_msg + + +class AsyncToolSandboxLocal(AsyncToolSandboxBase): + METADATA_CONFIG_STATE_KEY = "config_state" + REQUIREMENT_TXT_NAME = "requirements.txt" + + def __init__(self, tool_name: str, args: dict, user, force_recreate_venv=False, tool_object=None): + super().__init__(tool_name, args, user, tool_object) + self.force_recreate_venv = force_recreate_venv + + async def run( + self, + agent_state: Optional[AgentState] = None, + additional_env_vars: Optional[Dict] = None, + ) -> SandboxRunResult: + """ + Run the tool in a sandbox environment asynchronously, + *always* using a subprocess for execution. + """ + result = await self.run_local_dir_sandbox(agent_state=agent_state, additional_env_vars=additional_env_vars) + + # Simple console logging for demonstration + for log_line in (result.stdout or []) + (result.stderr or []): + print(f"Tool execution log: {log_line}") + + return result + + @trace_method + async def run_local_dir_sandbox(self, agent_state: Optional[AgentState], additional_env_vars: Optional[Dict]) -> SandboxRunResult: + """ + Unified asynchronougit pus method to run the tool in a local sandbox environment, + always via subprocess for multi-core parallelism. + """ + # Get sandbox configuration + sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.LOCAL, actor=self.user) + local_configs = sbx_config.get_local_config() + use_venv = local_configs.use_venv + + # Prepare environment variables + env = os.environ.copy() + env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100) + env.update(env_vars) + + if agent_state: + env.update(agent_state.get_agent_env_vars_as_dict()) + + if additional_env_vars: + env.update(additional_env_vars) + + # Make sure sandbox directory exists + sandbox_dir = os.path.expanduser(local_configs.sandbox_dir) + if not os.path.exists(sandbox_dir) or not os.path.isdir(sandbox_dir): + os.makedirs(sandbox_dir) + + # If using a virtual environment, ensure it's prepared in parallel + venv_preparation_task = None + if use_venv: + venv_path = str(os.path.join(sandbox_dir, local_configs.venv_name)) + venv_preparation_task = asyncio.create_task(self._prepare_venv(local_configs, venv_path, env)) + + # Generate and write execution script (always with markers, since we rely on stdout) + with tempfile.NamedTemporaryFile(mode="w", dir=sandbox_dir, suffix=".py", delete=False) as temp_file: + code = self.generate_execution_script(agent_state=agent_state, wrap_print_with_markers=True) + temp_file.write(code) + temp_file.flush() + temp_file_path = temp_file.name + + try: + # If we started a venv preparation task, wait for it to complete + if venv_preparation_task: + await venv_preparation_task + + # Determine the python executable and environment for the subprocess + exec_env = env.copy() + if use_venv: + venv_path = str(os.path.join(sandbox_dir, local_configs.venv_name)) + python_executable = find_python_executable(local_configs) + exec_env["VIRTUAL_ENV"] = venv_path + exec_env["PATH"] = os.path.join(venv_path, "bin") + ":" + exec_env["PATH"] + else: + # If not using venv, use whatever Python we are running on + python_executable = sys.executable + + exec_env["PYTHONWARNINGS"] = "ignore" + + # Execute in subprocess + return await self._execute_tool_subprocess( + sbx_config=sbx_config, + python_executable=python_executable, + temp_file_path=temp_file_path, + env=exec_env, + cwd=sandbox_dir, + ) + + except Exception as e: + print(f"Executing tool {self.tool_name} has an unexpected error: {e}") + print(f"Auto-generated code for debugging:\n\n{code}") + raise e + finally: + # Clean up the temp file + os.remove(temp_file_path) + + async def _prepare_venv(self, local_configs, venv_path: str, env: Dict[str, str]): + """ + Prepare virtual environment asynchronously (in a background thread). + """ + if self.force_recreate_venv or not os.path.isdir(venv_path): + sandbox_dir = os.path.expanduser(local_configs.sandbox_dir) + log_event(name="start create_venv_for_local_sandbox", attributes={"venv_path": venv_path}) + await asyncio.to_thread( + create_venv_for_local_sandbox, + sandbox_dir_path=sandbox_dir, + venv_path=venv_path, + env=env, + force_recreate=self.force_recreate_venv, + ) + log_event(name="finish create_venv_for_local_sandbox") + + log_event(name="start install_pip_requirements_for_sandbox", attributes={"local_configs": local_configs.model_dump_json()}) + await asyncio.to_thread(install_pip_requirements_for_sandbox, local_configs, upgrade=True, user_install_if_no_venv=False, env=env) + log_event(name="finish install_pip_requirements_for_sandbox", attributes={"local_configs": local_configs.model_dump_json()}) + + @trace_method + async def _execute_tool_subprocess( + self, sbx_config, python_executable: str, temp_file_path: str, env: Dict[str, str], cwd: str + ) -> SandboxRunResult: + """ + Execute user code in a subprocess, always capturing stdout and stderr. + We parse special markers to extract the pickled result string. + """ + try: + log_event(name="start subprocess") + + process = await asyncio.create_subprocess_exec( + python_executable, temp_file_path, env=env, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + + try: + stdout_bytes, stderr_bytes = await asyncio.wait_for(process.communicate(), timeout=60) + except asyncio.TimeoutError: + # Terminate the process on timeout + if process.returncode is None: + process.terminate() + try: + await asyncio.wait_for(process.wait(), timeout=5) + except asyncio.TimeoutError: + process.kill() + + raise TimeoutError(f"Executing tool {self.tool_name} timed out after 60 seconds.") + + stdout = stdout_bytes.decode("utf-8") if stdout_bytes else "" + stderr = stderr_bytes.decode("utf-8") if stderr_bytes else "" + log_event(name="finish subprocess") + + # Parse markers to isolate the function result + func_result, stdout_text = self.parse_out_function_results_markers(stdout) + func_return, agent_state = self.parse_best_effort(func_result) + + return SandboxRunResult( + func_return=func_return, + agent_state=agent_state, + stdout=[stdout_text] if stdout_text else [], + stderr=[stderr] if stderr else [], + status="success" if process.returncode == 0 else "error", + sandbox_config_fingerprint=sbx_config.fingerprint(), + ) + + except (TimeoutError, Exception) as e: + # Distinguish between timeouts and other exceptions for clarity + if isinstance(e, TimeoutError): + raise e + + print(f"Subprocess execution for tool {self.tool_name} encountered an error: {e}") + func_return = get_friendly_error_msg( + function_name=self.tool_name, + exception_name=type(e).__name__, + exception_message=str(e), + ) + return SandboxRunResult( + func_return=func_return, + agent_state=None, + stdout=[], + stderr=[str(e)], + status="error", + sandbox_config_fingerprint=sbx_config.fingerprint(), + ) + + def parse_out_function_results_markers(self, text: str) -> Tuple[str, str]: + """ + Parse the function results out of the stdout using special markers. + Returns (function_result_str, stripped_stdout). + """ + if self.LOCAL_SANDBOX_RESULT_START_MARKER not in text: + # No markers found, so nothing to parse + return "", text + + marker_len = len(self.LOCAL_SANDBOX_RESULT_START_MARKER) + start_index = text.index(self.LOCAL_SANDBOX_RESULT_START_MARKER) + marker_len + end_index = text.index(self.LOCAL_SANDBOX_RESULT_END_MARKER) + + # The actual pickled base64 is between start_index and end_index + results_str = text[start_index:end_index] + # The rest of stdout (minus the markers) + remainder = text[: start_index - marker_len] + text[end_index + marker_len :] + return results_str, remainder diff --git a/tests/integration_test_async_tool_sandbox.py b/tests/integration_test_async_tool_sandbox.py new file mode 100644 index 00000000..11c64526 --- /dev/null +++ b/tests/integration_test_async_tool_sandbox.py @@ -0,0 +1,515 @@ +import secrets +import string +import uuid +from pathlib import Path +from unittest.mock import patch + +import pytest +from sqlalchemy import delete + +from letta import create_client +from letta.functions.function_sets.base import core_memory_append, core_memory_replace +from letta.orm.sandbox_config import SandboxConfig, SandboxEnvironmentVariable +from letta.schemas.agent import AgentState +from letta.schemas.embedding_config import EmbeddingConfig +from letta.schemas.environment_variables import AgentEnvironmentVariable, SandboxEnvironmentVariableCreate +from letta.schemas.llm_config import LLMConfig +from letta.schemas.memory import ChatMemory +from letta.schemas.organization import Organization +from letta.schemas.sandbox_config import E2BSandboxConfig, LocalSandboxConfig, PipRequirement, SandboxConfigCreate +from letta.schemas.user import User +from letta.services.organization_manager import OrganizationManager +from letta.services.sandbox_config_manager import SandboxConfigManager +from letta.services.tool_manager import ToolManager +from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B +from letta.services.tool_sandbox.local_sandbox import AsyncToolSandboxLocal +from letta.services.user_manager import UserManager +from tests.helpers.utils import create_tool_from_func + +# Constants +namespace = uuid.NAMESPACE_DNS +org_name = str(uuid.uuid5(namespace, "test-tool-execution-sandbox-org")) +user_name = str(uuid.uuid5(namespace, "test-tool-execution-sandbox-user")) + + +# Fixtures +@pytest.fixture(autouse=True) +def clear_tables(): + """Fixture to clear the organization table before each test.""" + from letta.server.db import db_context + + with db_context() as session: + session.execute(delete(SandboxEnvironmentVariable)) + session.execute(delete(SandboxConfig)) + session.commit() # Commit the deletion + + +@pytest.fixture +def test_organization(): + """Fixture to create and return the default organization.""" + org = OrganizationManager().create_organization(Organization(name=org_name)) + yield org + + +@pytest.fixture +def test_user(test_organization): + """Fixture to create and return the default user within the default organization.""" + user = UserManager().create_user(User(name=user_name, organization_id=test_organization.id)) + yield user + + +@pytest.fixture +def add_integers_tool(test_user): + def add(x: int, y: int) -> int: + """ + Simple function that adds two integers. + + Parameters: + x (int): The first integer to add. + y (int): The second integer to add. + + Returns: + int: The result of adding x and y. + """ + return x + y + + tool = create_tool_from_func(add) + tool = ToolManager().create_or_update_tool(tool, test_user) + yield tool + + +@pytest.fixture +def cowsay_tool(test_user): + # This defines a tool for a package we definitely do NOT have in letta + # If this test passes, that means the tool was correctly executed in a separate Python environment + def cowsay() -> str: + """ + Simple function that uses the cowsay package to print out the secret word env variable. + + Returns: + str: The cowsay ASCII art. + """ + import os + + import cowsay + + cowsay.cow(os.getenv("secret_word")) + + tool = create_tool_from_func(cowsay) + tool = ToolManager().create_or_update_tool(tool, test_user) + yield tool + + +@pytest.fixture +def get_env_tool(test_user): + def get_env() -> str: + """ + Simple function that returns the secret word env variable. + + Returns: + str: The secret word + """ + import os + + secret_word = os.getenv("secret_word") + print(secret_word) + return secret_word + + tool = create_tool_from_func(get_env) + tool = ToolManager().create_or_update_tool(tool, test_user) + yield tool + + +@pytest.fixture +def get_warning_tool(test_user): + def warn_hello_world() -> str: + """ + Simple function that warns hello world. + + Returns: + str: hello world + """ + import warnings + + msg = "Hello World" + warnings.warn(msg) + return msg + + tool = create_tool_from_func(warn_hello_world) + tool = ToolManager().create_or_update_tool(tool, test_user) + yield tool + + +@pytest.fixture +def always_err_tool(test_user): + def error() -> str: + """ + Simple function that errors + + Returns: + str: not important + """ + # Raise a unusual error so we know it's from this function + print("Going to error now") + raise ZeroDivisionError("This is an intentionally weird division!") + + tool = create_tool_from_func(error) + tool = ToolManager().create_or_update_tool(tool, test_user) + yield tool + + +@pytest.fixture +def list_tool(test_user): + def create_list(): + """Simple function that returns a list""" + + return [1] * 5 + + tool = create_tool_from_func(create_list) + tool = ToolManager().create_or_update_tool(tool, test_user) + yield tool + + +@pytest.fixture +def clear_core_memory_tool(test_user): + def clear_memory(agent_state: "AgentState"): + """Clear the core memory""" + agent_state.memory.get_block("human").value = "" + agent_state.memory.get_block("persona").value = "" + + tool = create_tool_from_func(clear_memory) + tool = ToolManager().create_or_update_tool(tool, test_user) + yield tool + + +@pytest.fixture +def external_codebase_tool(test_user): + from tests.test_tool_sandbox.restaurant_management_system.adjust_menu_prices import adjust_menu_prices + + tool = create_tool_from_func(adjust_menu_prices) + tool = ToolManager().create_or_update_tool(tool, test_user) + yield tool + + +@pytest.fixture +def agent_state(): + client = create_client() + agent_state = client.create_agent( + memory=ChatMemory(persona="This is the persona", human="My name is Chad"), + embedding_config=EmbeddingConfig.default_config(provider="openai"), + llm_config=LLMConfig.default_config(model_name="gpt-4o-mini"), + ) + agent_state.tool_rules = [] + yield agent_state + + +@pytest.fixture +def custom_test_sandbox_config(test_user): + """ + Fixture to create a consistent local sandbox configuration for tests. + + Args: + test_user: The test user to be used for creating the sandbox configuration. + + Returns: + A tuple containing the SandboxConfigManager and the created sandbox configuration. + """ + # Create the SandboxConfigManager + manager = SandboxConfigManager() + + # Set the sandbox to be within the external codebase path and use a venv + external_codebase_path = str(Path(__file__).parent / "test_tool_sandbox" / "restaurant_management_system") + # tqdm is used in this codebase, but NOT in the requirements.txt, this tests that we can successfully install pip requirements + local_sandbox_config = LocalSandboxConfig( + sandbox_dir=external_codebase_path, use_venv=True, pip_requirements=[PipRequirement(name="tqdm")] + ) + + # Create the sandbox configuration + config_create = SandboxConfigCreate(config=local_sandbox_config.model_dump()) + + # Create or update the sandbox configuration + manager.create_or_update_sandbox_config(sandbox_config_create=config_create, actor=test_user) + + return manager, local_sandbox_config + + +# Tool-specific fixtures +@pytest.fixture +def core_memory_tools(test_user): + """Create all base tools for testing.""" + tools = {} + for func in [ + core_memory_replace, + core_memory_append, + ]: + tool = create_tool_from_func(func) + tool = ToolManager().create_or_update_tool(tool, test_user) + tools[func.__name__] = tool + yield tools + + +# Local sandbox tests + + +@pytest.mark.asyncio +@pytest.mark.local_sandbox +async def test_local_sandbox_default(mock_e2b_api_key_none, add_integers_tool, test_user): + args = {"x": 10, "y": 5} + + # Mock and assert correct pathway was invoked + with patch.object(AsyncToolSandboxLocal, "run_local_dir_sandbox") as mock_run_local_dir_sandbox: + sandbox = AsyncToolSandboxLocal(add_integers_tool.name, args, user=test_user) + await sandbox.run() + mock_run_local_dir_sandbox.assert_called_once() + + # Run again to get actual response + sandbox = AsyncToolSandboxLocal(add_integers_tool.name, args, user=test_user) + result = await sandbox.run() + assert result.func_return == args["x"] + args["y"] + + +@pytest.mark.asyncio +@pytest.mark.local_sandbox +async def test_local_sandbox_stateful_tool(mock_e2b_api_key_none, clear_core_memory_tool, test_user, agent_state): + args = {} + sandbox = AsyncToolSandboxLocal(clear_core_memory_tool.name, args, user=test_user) + result = await sandbox.run(agent_state=agent_state) + assert sandbox.inject_agent_state == True + assert result.agent_state.memory.get_block("human").value == "" + assert result.agent_state.memory.get_block("persona").value == "" + assert result.func_return is None + + +@pytest.mark.asyncio +@pytest.mark.local_sandbox +async def test_local_sandbox_with_list_rv(mock_e2b_api_key_none, list_tool, test_user): + sandbox = AsyncToolSandboxLocal(list_tool.name, {}, user=test_user) + result = await sandbox.run() + assert len(result.func_return) == 5 + + +@pytest.mark.asyncio +@pytest.mark.local_sandbox +async def test_local_sandbox_env(mock_e2b_api_key_none, get_env_tool, test_user): + manager = SandboxConfigManager() + sandbox_dir = str(Path(__file__).parent / "test_tool_sandbox") + config_create = SandboxConfigCreate(config=LocalSandboxConfig(sandbox_dir=sandbox_dir).model_dump()) + config = manager.create_or_update_sandbox_config(config_create, test_user) + + key = "secret_word" + long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + manager.create_sandbox_env_var( + SandboxEnvironmentVariableCreate(key=key, value=long_random_string), sandbox_config_id=config.id, actor=test_user + ) + + sandbox = AsyncToolSandboxLocal(get_env_tool.name, {}, user=test_user) + result = await sandbox.run() + assert long_random_string in result.func_return + + +@pytest.mark.asyncio +@pytest.mark.local_sandbox +async def test_local_sandbox_per_agent_env(mock_e2b_api_key_none, get_env_tool, agent_state, test_user): + manager = SandboxConfigManager() + key = "secret_word" + sandbox_dir = str(Path(__file__).parent / "test_tool_sandbox") + config_create = SandboxConfigCreate(config=LocalSandboxConfig(sandbox_dir=sandbox_dir).model_dump()) + config = manager.create_or_update_sandbox_config(config_create, test_user) + + wrong_val = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + manager.create_sandbox_env_var(SandboxEnvironmentVariableCreate(key=key, value=wrong_val), sandbox_config_id=config.id, actor=test_user) + + correct_val = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + agent_state.tool_exec_environment_variables = [AgentEnvironmentVariable(key=key, value=correct_val, agent_id=agent_state.id)] + + sandbox = AsyncToolSandboxLocal(get_env_tool.name, {}, user=test_user) + result = await sandbox.run(agent_state=agent_state) + assert wrong_val not in result.func_return + assert correct_val in result.func_return + + +@pytest.mark.asyncio +@pytest.mark.local_sandbox +async def test_local_sandbox_external_codebase_with_venv( + mock_e2b_api_key_none, custom_test_sandbox_config, external_codebase_tool, test_user +): + args = {"percentage": 10} + sandbox = AsyncToolSandboxLocal(external_codebase_tool.name, args, user=test_user) + result = await sandbox.run() + assert result.func_return == "Price Adjustments:\nBurger: $8.99 -> $9.89\nFries: $2.99 -> $3.29\nSoda: $1.99 -> $2.19" + assert "Hello World" in result.stdout[0] + + +@pytest.mark.asyncio +@pytest.mark.local_sandbox +async def test_local_sandbox_with_venv_and_warnings_does_not_error( + mock_e2b_api_key_none, custom_test_sandbox_config, get_warning_tool, test_user +): + sandbox = AsyncToolSandboxLocal(get_warning_tool.name, {}, user=test_user) + result = await sandbox.run() + assert result.func_return == "Hello World" + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_local_sandbox_with_venv_errors(mock_e2b_api_key_none, custom_test_sandbox_config, always_err_tool, test_user): + sandbox = AsyncToolSandboxLocal(always_err_tool.name, {}, user=test_user) + result = await sandbox.run() + assert len(result.stdout) != 0 + assert "error" in result.stdout[0] + assert len(result.stderr) != 0 + assert "ZeroDivisionError: This is an intentionally weird division!" in result.stderr[0] + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_local_sandbox_with_venv_pip_installs_basic(mock_e2b_api_key_none, cowsay_tool, test_user): + manager = SandboxConfigManager() + config_create = SandboxConfigCreate( + config=LocalSandboxConfig(use_venv=True, pip_requirements=[PipRequirement(name="cowsay")]).model_dump() + ) + config = manager.create_or_update_sandbox_config(config_create, test_user) + + key = "secret_word" + long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + manager.create_sandbox_env_var( + SandboxEnvironmentVariableCreate(key=key, value=long_random_string), sandbox_config_id=config.id, actor=test_user + ) + + sandbox = AsyncToolSandboxLocal(cowsay_tool.name, {}, user=test_user, force_recreate_venv=True) + result = await sandbox.run() + assert long_random_string in result.stdout[0] + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_local_sandbox_with_venv_pip_installs_with_update(mock_e2b_api_key_none, cowsay_tool, test_user): + manager = SandboxConfigManager() + config_create = SandboxConfigCreate(config=LocalSandboxConfig(use_venv=True).model_dump()) + config = manager.create_or_update_sandbox_config(config_create, test_user) + + key = "secret_word" + long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + manager.create_sandbox_env_var( + SandboxEnvironmentVariableCreate(key=key, value=long_random_string), sandbox_config_id=config.id, actor=test_user + ) + + sandbox = AsyncToolSandboxLocal(cowsay_tool.name, {}, user=test_user, force_recreate_venv=True) + result = await sandbox.run() + assert len(result.stdout) == 0 + assert "No module named 'cowsay'" in result.stderr[0] + + config_create = SandboxConfigCreate( + config=LocalSandboxConfig(use_venv=True, pip_requirements=[PipRequirement(name="cowsay")]).model_dump() + ) + manager.create_or_update_sandbox_config(config_create, test_user) + + sandbox = AsyncToolSandboxLocal(cowsay_tool.name, {}, user=test_user, force_recreate_venv=False) + result = await sandbox.run() + assert long_random_string in result.stdout[0] + + +# E2B sandbox tests + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_e2b_sandbox_default(check_e2b_key_is_set, add_integers_tool, test_user): + args = {"x": 10, "y": 5} + + # Mock and assert correct pathway was invoked + with patch.object(AsyncToolSandboxE2B, "run_e2b_sandbox") as mock_run_local_dir_sandbox: + sandbox = AsyncToolSandboxE2B(add_integers_tool.name, args, user=test_user) + await sandbox.run() + mock_run_local_dir_sandbox.assert_called_once() + + # Run again to get actual response + sandbox = AsyncToolSandboxE2B(add_integers_tool.name, args, user=test_user) + result = await sandbox.run() + assert int(result.func_return) == args["x"] + args["y"] + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_e2b_sandbox_pip_installs(check_e2b_key_is_set, cowsay_tool, test_user): + manager = SandboxConfigManager() + config_create = SandboxConfigCreate(config=E2BSandboxConfig(pip_requirements=["cowsay"]).model_dump()) + config = manager.create_or_update_sandbox_config(config_create, test_user) + + key = "secret_word" + long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + manager.create_sandbox_env_var( + SandboxEnvironmentVariableCreate(key=key, value=long_random_string), + sandbox_config_id=config.id, + actor=test_user, + ) + + sandbox = AsyncToolSandboxE2B(cowsay_tool.name, {}, user=test_user) + result = await sandbox.run() + assert long_random_string in result.stdout[0] + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_e2b_sandbox_stateful_tool(check_e2b_key_is_set, clear_core_memory_tool, test_user, agent_state): + sandbox = AsyncToolSandboxE2B(clear_core_memory_tool.name, {}, user=test_user) + result = await sandbox.run(agent_state=agent_state) + assert result.agent_state.memory.get_block("human").value == "" + assert result.agent_state.memory.get_block("persona").value == "" + assert result.func_return is None + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_e2b_sandbox_inject_env_var_existing_sandbox(check_e2b_key_is_set, get_env_tool, test_user): + manager = SandboxConfigManager() + config_create = SandboxConfigCreate(config=E2BSandboxConfig().model_dump()) + config = manager.create_or_update_sandbox_config(config_create, test_user) + + sandbox = AsyncToolSandboxE2B(get_env_tool.name, {}, user=test_user) + result = await sandbox.run() + assert result.func_return is None + + key = "secret_word" + long_random_string = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + manager.create_sandbox_env_var( + SandboxEnvironmentVariableCreate(key=key, value=long_random_string), + sandbox_config_id=config.id, + actor=test_user, + ) + + sandbox = AsyncToolSandboxE2B(get_env_tool.name, {}, user=test_user) + result = await sandbox.run() + assert long_random_string in result.func_return + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_e2b_sandbox_per_agent_env(check_e2b_key_is_set, get_env_tool, agent_state, test_user): + manager = SandboxConfigManager() + key = "secret_word" + wrong_val = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + correct_val = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)) + + config_create = SandboxConfigCreate(config=LocalSandboxConfig().model_dump()) + config = manager.create_or_update_sandbox_config(config_create, test_user) + manager.create_sandbox_env_var( + SandboxEnvironmentVariableCreate(key=key, value=wrong_val), + sandbox_config_id=config.id, + actor=test_user, + ) + + agent_state.tool_exec_environment_variables = [AgentEnvironmentVariable(key=key, value=correct_val, agent_id=agent_state.id)] + + sandbox = AsyncToolSandboxE2B(get_env_tool.name, {}, user=test_user) + result = await sandbox.run(agent_state=agent_state) + assert wrong_val not in result.func_return + assert correct_val in result.func_return + + +@pytest.mark.asyncio +@pytest.mark.e2b_sandbox +async def test_e2b_sandbox_with_list_rv(check_e2b_key_is_set, list_tool, test_user): + sandbox = AsyncToolSandboxE2B(list_tool.name, {}, user=test_user) + result = await sandbox.run() + assert len(result.func_return) == 5