feat: Support external codebases as a tool execution sandbox (#2159)
This commit is contained in:
@@ -4,9 +4,12 @@ import io
|
||||
import os
|
||||
import pickle
|
||||
import runpy
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Any, Optional
|
||||
import uuid
|
||||
import venv
|
||||
from typing import Any, Dict, Optional, TextIO
|
||||
|
||||
from letta.log import get_logger
|
||||
from letta.schemas.agent import AgentState
|
||||
@@ -24,6 +27,11 @@ class ToolExecutionSandbox:
|
||||
METADATA_CONFIG_STATE_KEY = "config_state"
|
||||
REQUIREMENT_TXT_NAME = "requirements.txt"
|
||||
|
||||
# For generating long, random marker hashes
|
||||
NAMESPACE = uuid.NAMESPACE_DNS
|
||||
LOCAL_SANDBOX_RESULT_START_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-start-marker"))
|
||||
LOCAL_SANDBOX_RESULT_END_MARKER = str(uuid.uuid5(NAMESPACE, "local-sandbox-result-end-marker"))
|
||||
|
||||
# This is the variable name in the auto-generated code that contains the function results
|
||||
# We make this a long random string to avoid collisions with any variables in the user's code
|
||||
LOCAL_SANDBOX_RESULT_VAR_NAME = "result_ZQqiequkcFwRwwGQMqkt"
|
||||
@@ -65,12 +73,10 @@ class ToolExecutionSandbox:
|
||||
"""
|
||||
if tool_settings.e2b_api_key:
|
||||
logger.debug(f"Using e2b sandbox to execute {self.tool_name}")
|
||||
code = self.generate_execution_script(agent_state=agent_state)
|
||||
result = self.run_e2b_sandbox(code=code)
|
||||
result = self.run_e2b_sandbox(agent_state=agent_state)
|
||||
else:
|
||||
logger.debug(f"Using local sandbox to execute {self.tool_name}")
|
||||
code = self.generate_execution_script(agent_state=agent_state)
|
||||
result = self.run_local_dir_sandbox(code=code)
|
||||
result = self.run_local_dir_sandbox(agent_state=agent_state)
|
||||
|
||||
# Log out any stdout from the tool run
|
||||
logger.debug(f"Executed tool '{self.tool_name}', logging stdout from tool run: \n")
|
||||
@@ -94,12 +100,14 @@ class ToolExecutionSandbox:
|
||||
os.environ.clear()
|
||||
os.environ.update(original_env) # Restore original environment variables
|
||||
|
||||
def run_local_dir_sandbox(self, code: str) -> Optional[SandboxRunResult]:
|
||||
def run_local_dir_sandbox(self, agent_state: AgentState) -> Optional[SandboxRunResult]:
|
||||
sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.LOCAL, actor=self.user)
|
||||
local_configs = sbx_config.get_local_config()
|
||||
|
||||
# Get environment variables for the sandbox
|
||||
env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100)
|
||||
env = os.environ.copy()
|
||||
env.update(env_vars)
|
||||
|
||||
# Safety checks
|
||||
if not os.path.isdir(local_configs.sandbox_dir):
|
||||
@@ -107,6 +115,12 @@ class ToolExecutionSandbox:
|
||||
|
||||
# Write the code to a temp file in the sandbox_dir
|
||||
with tempfile.NamedTemporaryFile(mode="w", dir=local_configs.sandbox_dir, suffix=".py", delete=False) as temp_file:
|
||||
if local_configs.use_venv:
|
||||
# If using venv, we need to wrap with special string markers to separate out the output and the stdout (since it is all in stdout)
|
||||
code = self.generate_execution_script(agent_state=agent_state, wrap_print_with_markers=True)
|
||||
else:
|
||||
code = self.generate_execution_script(agent_state=agent_state)
|
||||
|
||||
temp_file.write(code)
|
||||
temp_file.flush()
|
||||
temp_file_path = temp_file.name
|
||||
@@ -114,39 +128,122 @@ class ToolExecutionSandbox:
|
||||
# Save the old stdout
|
||||
old_stdout = sys.stdout
|
||||
try:
|
||||
# Redirect stdout to capture script output
|
||||
captured_stdout = io.StringIO()
|
||||
sys.stdout = captured_stdout
|
||||
|
||||
# Execute the temp file
|
||||
with self.temporary_env_vars(env_vars):
|
||||
result = runpy.run_path(temp_file_path, init_globals=env_vars)
|
||||
|
||||
# Fetch the result
|
||||
func_result = result.get(self.LOCAL_SANDBOX_RESULT_VAR_NAME)
|
||||
func_return, agent_state = self.parse_best_effort(func_result)
|
||||
|
||||
# Restore stdout and collect captured output
|
||||
sys.stdout = old_stdout
|
||||
stdout_output = captured_stdout.getvalue()
|
||||
|
||||
return SandboxRunResult(
|
||||
func_return=func_return,
|
||||
agent_state=agent_state,
|
||||
stdout=[stdout_output],
|
||||
sandbox_config_fingerprint=sbx_config.fingerprint(),
|
||||
)
|
||||
if local_configs.use_venv:
|
||||
return self.run_local_dir_sandbox_venv(sbx_config, env, temp_file_path)
|
||||
else:
|
||||
return self.run_local_dir_sandbox_runpy(sbx_config, env_vars, temp_file_path, old_stdout)
|
||||
except Exception as e:
|
||||
logger.error(f"Executing tool {self.tool_name} has an unexpected error: {e}")
|
||||
logger.error(f"Logging out tool {self.tool_name} auto-generated code for debugging: \n\n{code}")
|
||||
raise e
|
||||
finally:
|
||||
# Clean up the temp file and restore stdout
|
||||
sys.stdout = old_stdout
|
||||
os.remove(temp_file_path)
|
||||
|
||||
def run_local_dir_sandbox_venv(self, sbx_config: SandboxConfig, env: Dict[str, str], temp_file_path: str) -> SandboxRunResult:
|
||||
local_configs = sbx_config.get_local_config()
|
||||
venv_path = os.path.join(local_configs.sandbox_dir, local_configs.venv_name)
|
||||
|
||||
# Safety checks for the venv
|
||||
# Verify that the venv path exists and is a directory
|
||||
if not os.path.isdir(venv_path):
|
||||
logger.warning(f"Virtual environment directory does not exist at: {venv_path}, creating one now...")
|
||||
self.create_venv_for_local_sandbox(sandbox_dir_path=local_configs.sandbox_dir, venv_path=venv_path, env=env)
|
||||
|
||||
# Ensure the python interpreter exists in the virtual environment
|
||||
python_executable = os.path.join(venv_path, "bin", "python3")
|
||||
if not os.path.isfile(python_executable):
|
||||
raise FileNotFoundError(f"Python executable not found in virtual environment: {python_executable}")
|
||||
|
||||
# Set up env for venv
|
||||
env["VIRTUAL_ENV"] = venv_path
|
||||
env["PATH"] = os.path.join(venv_path, "bin") + ":" + env["PATH"]
|
||||
|
||||
# Execute the code in a restricted subprocess
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[os.path.join(venv_path, "bin", "python3"), temp_file_path],
|
||||
env=env,
|
||||
cwd=local_configs.sandbox_dir, # Restrict execution to sandbox_dir
|
||||
timeout=60,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if result.stderr:
|
||||
logger.error(f"Sandbox execution error: {result.stderr}")
|
||||
raise RuntimeError(f"Sandbox execution error: {result.stderr}")
|
||||
|
||||
func_result, stdout = self.parse_out_function_results_markers(result.stdout)
|
||||
func_return, agent_state = self.parse_best_effort(func_result)
|
||||
return SandboxRunResult(
|
||||
func_return=func_return, agent_state=agent_state, stdout=[stdout], sandbox_config_fingerprint=sbx_config.fingerprint()
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
raise TimeoutError(f"Executing tool {self.tool_name} has timed out.")
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise RuntimeError(f"Executing tool {self.tool_name} has process error: {e}")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Executing tool {self.tool_name} has an unexpected error: {e}")
|
||||
|
||||
def run_local_dir_sandbox_runpy(
|
||||
self, sbx_config: SandboxConfig, env_vars: Dict[str, str], temp_file_path: str, old_stdout: TextIO
|
||||
) -> SandboxRunResult:
|
||||
# Redirect stdout to capture script output
|
||||
captured_stdout = io.StringIO()
|
||||
sys.stdout = captured_stdout
|
||||
|
||||
# Execute the temp file
|
||||
with self.temporary_env_vars(env_vars):
|
||||
result = runpy.run_path(temp_file_path, init_globals=env_vars)
|
||||
|
||||
# Fetch the result
|
||||
func_result = result.get(self.LOCAL_SANDBOX_RESULT_VAR_NAME)
|
||||
func_return, agent_state = self.parse_best_effort(func_result)
|
||||
|
||||
# Restore stdout and collect captured output
|
||||
sys.stdout = old_stdout
|
||||
stdout_output = captured_stdout.getvalue()
|
||||
|
||||
return SandboxRunResult(
|
||||
func_return=func_return,
|
||||
agent_state=agent_state,
|
||||
stdout=[stdout_output],
|
||||
sandbox_config_fingerprint=sbx_config.fingerprint(),
|
||||
)
|
||||
|
||||
def parse_out_function_results_markers(self, text: str):
|
||||
marker_len = len(self.LOCAL_SANDBOX_RESULT_START_MARKER)
|
||||
start_index = text.index(self.LOCAL_SANDBOX_RESULT_START_MARKER) + marker_len
|
||||
end_index = text.index(self.LOCAL_SANDBOX_RESULT_END_MARKER)
|
||||
return text[start_index:end_index], text[: start_index - marker_len] + text[end_index + +marker_len :]
|
||||
|
||||
def create_venv_for_local_sandbox(self, sandbox_dir_path: str, venv_path: str, env: Dict[str, str]):
|
||||
# Step 1: Create the virtual environment
|
||||
venv.create(venv_path, with_pip=True)
|
||||
|
||||
pip_path = os.path.join(venv_path, "bin", "pip")
|
||||
try:
|
||||
# Step 2: Upgrade pip
|
||||
logger.info("Upgrading pip in the virtual environment...")
|
||||
subprocess.run([pip_path, "install", "--upgrade", "pip"], env=env, check=True)
|
||||
|
||||
# Step 3: Install packages from requirements.txt if provided
|
||||
requirements_txt_path = os.path.join(sandbox_dir_path, self.REQUIREMENT_TXT_NAME)
|
||||
if os.path.isfile(requirements_txt_path):
|
||||
logger.info(f"Installing packages from requirements file: {requirements_txt_path}")
|
||||
subprocess.run([pip_path, "install", "-r", requirements_txt_path], env=env, check=True)
|
||||
logger.info("Successfully installed packages from requirements.txt")
|
||||
else:
|
||||
logger.warning("No requirements.txt file provided or the file does not exist. Skipping package installation.")
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f"Error while setting up the virtual environment: {e}")
|
||||
raise RuntimeError(f"Failed to set up the virtual environment: {e}")
|
||||
|
||||
# e2b sandbox specific functions
|
||||
|
||||
def run_e2b_sandbox(self, code: str) -> Optional[SandboxRunResult]:
|
||||
def run_e2b_sandbox(self, agent_state: AgentState) -> Optional[SandboxRunResult]:
|
||||
sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.E2B, actor=self.user)
|
||||
sbx = self.get_running_e2b_sandbox_with_same_state(sbx_config)
|
||||
if not sbx or self.force_recreate:
|
||||
@@ -158,6 +255,7 @@ class ToolExecutionSandbox:
|
||||
# Get environment variables for the sandbox
|
||||
# TODO: We set limit to 100 here, but maybe we want it uncapped? Realistically this should be fine.
|
||||
env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100)
|
||||
code = self.generate_execution_script(agent_state=agent_state)
|
||||
execution = sbx.run_code(code, envs=env_vars)
|
||||
if execution.error is not None:
|
||||
logger.error(f"Executing tool {self.tool_name} failed with {execution.error}")
|
||||
@@ -236,13 +334,14 @@ class ToolExecutionSandbox:
|
||||
args.append(arg.arg)
|
||||
return args
|
||||
|
||||
def generate_execution_script(self, agent_state: AgentState) -> str:
|
||||
def generate_execution_script(self, agent_state: AgentState, wrap_print_with_markers: bool = False) -> str:
|
||||
"""
|
||||
Generate code to run inside of execution sandbox.
|
||||
Passes into a serialized agent state into the code, to be accessed by the tool.
|
||||
|
||||
Args:
|
||||
agent_state (AgentState): The agent state
|
||||
wrap_print_with_markers (bool): If true, we wrap the final statement with a `print` and wrap with special markers
|
||||
|
||||
Returns:
|
||||
code (str): The generated code strong
|
||||
@@ -286,7 +385,14 @@ class ToolExecutionSandbox:
|
||||
code += (
|
||||
f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME} = base64.b64encode(pickle.dumps({self.LOCAL_SANDBOX_RESULT_VAR_NAME})).decode('utf-8')\n"
|
||||
)
|
||||
code += f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME}\n"
|
||||
|
||||
if wrap_print_with_markers:
|
||||
code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_START_MARKER}')\n"
|
||||
code += f"sys.stdout.write(str({self.LOCAL_SANDBOX_RESULT_VAR_NAME}))\n"
|
||||
code += f"sys.stdout.write('{self.LOCAL_SANDBOX_RESULT_END_MARKER}')\n"
|
||||
else:
|
||||
code += f"{self.LOCAL_SANDBOX_RESULT_VAR_NAME}\n"
|
||||
|
||||
return code
|
||||
|
||||
def _convert_param_to_value(self, param_type: str, raw_value: str) -> str:
|
||||
|
||||
Reference in New Issue
Block a user