diff --git a/letta/schemas/tool.py b/letta/schemas/tool.py index 858921b7..9b94f82b 100644 --- a/letta/schemas/tool.py +++ b/letta/schemas/tool.py @@ -129,6 +129,19 @@ class ToolCreate(LettaBase): False, description="If set to True, then this tool will potentially be executed concurrently with other tools. Default False." ) + @model_validator(mode="after") + def validate_typescript_requires_schema(self): + """ + TypeScript tools require an explicit json_schema since we don't support + docstring parsing for TypeScript. + """ + if self.source_type == "typescript" and not self.json_schema: + raise ValueError( + "TypeScript tools require an explicit json_schema parameter. " + "Unlike Python tools, schema cannot be auto-generated from TypeScript source code." + ) + return self + @classmethod def from_mcp(cls, mcp_server_name: str, mcp_tool: MCPTool) -> "ToolCreate": from letta.functions.helpers import generate_mcp_tool_wrapper diff --git a/letta/services/tool_executor/sandbox_tool_executor.py b/letta/services/tool_executor/sandbox_tool_executor.py index 5946a018..ea5af641 100644 --- a/letta/services/tool_executor/sandbox_tool_executor.py +++ b/letta/services/tool_executor/sandbox_tool_executor.py @@ -149,8 +149,12 @@ class SandboxToolExecutor(ToolExecutor): @staticmethod def _prepare_function_args(function_args: JsonDict, tool: Tool, function_name: str) -> dict: """Prepare function arguments with proper type coercion.""" + # Skip Python AST parsing for TypeScript tools - they use json_schema for type info + if tool.source_type == "typescript": + return function_args + try: - # Parse the source code to extract function annotations + # Parse the source code to extract function annotations (Python only) annotations = get_function_annotations_from_source(tool.source_code, function_name) # Coerce the function arguments to the correct types based on the annotations return coerce_dict_args_by_annotations(function_args, annotations) diff --git a/letta/services/tool_sandbox/base.py b/letta/services/tool_sandbox/base.py index 31c02bda..14ded2cb 100644 --- a/letta/services/tool_sandbox/base.py +++ b/letta/services/tool_sandbox/base.py @@ -7,6 +7,7 @@ from typing import Any, Dict, Optional from letta.functions.helpers import generate_model_from_args_json_schema from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState +from letta.schemas.enums import ToolSourceType from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool from letta.schemas.tool_execution_result import ToolExecutionResult @@ -63,18 +64,28 @@ class AsyncToolSandboxBase(ABC): f"Agent attempted to invoke tool {self.tool_name} that does not exist for organization {self.user.organization_id}" ) - # Check for reserved keyword arguments - tool_arguments = parse_function_arguments(self.tool.source_code, self.tool.name) - - # TODO: deprecate this - if "agent_state" in tool_arguments: - self.inject_agent_state = True - else: + # TypeScript tools do not support agent_state or agent_id injection as function params + # (these are Python-only features). Instead, agent_id is exposed via LETTA_AGENT_ID env var. + if self.is_typescript_tool(): self.inject_agent_state = False + self.inject_agent_id = False + else: + # Check for reserved keyword arguments (Python tools only) + tool_arguments = parse_function_arguments(self.tool.source_code, self.tool.name) + + # TODO: deprecate this + # Note: AgentState injection is a legacy feature for Python tools only. + if "agent_state" in tool_arguments: + self.inject_agent_state = True + else: + self.inject_agent_state = False + + self.inject_agent_id = "agent_id" in tool_arguments # Always inject Letta client (available as `client` variable in sandbox) + # For Python: letta_client package + # For TypeScript: @letta-ai/letta-client package self.inject_letta_client = True - self.inject_agent_id = "agent_id" in tool_arguments self.is_async_function = self._detect_async_function() self._initialized = True @@ -98,13 +109,23 @@ class AsyncToolSandboxBase(ABC): """ raise NotImplementedError + def is_typescript_tool(self) -> bool: + """Check if the tool is a TypeScript tool based on source_type.""" + if self.tool and self.tool.source_type: + return self.tool.source_type == ToolSourceType.typescript or self.tool.source_type == "typescript" + return False + @trace_method async def generate_execution_script(self, agent_state: Optional[AgentState], wrap_print_with_markers: bool = False) -> str: """ Generate code to run inside of execution sandbox. Serialize the agent state and arguments, call the tool, - then base64-encode/pickle the result. Constructs the python file. + then base64-encode/pickle the result. Constructs the python file (or TypeScript for TS tools). """ await self._init_async() + + # Route to TypeScript generator for TypeScript tools + if self.is_typescript_tool(): + return await self._generate_typescript_execution_script(agent_state) future_import = False schema_code = None @@ -321,6 +342,25 @@ class AsyncToolSandboxBase(ABC): return "\n".join(lines) + "\n" + async def _generate_typescript_execution_script(self, agent_state: Optional[AgentState]) -> str: + """ + Generate TypeScript code to run inside of execution sandbox. + + TypeScript tools: + - Do NOT support agent_state injection (stateless) + - agent_id is available via process.env.LETTA_AGENT_ID + - Return JSON-serialized results instead of pickle + - Require explicit json_schema (no docstring parsing) + """ + from letta.services.tool_sandbox.typescript_generator import generate_typescript_execution_script + + return generate_typescript_execution_script( + tool_name=self.tool.name, + tool_source_code=self.tool.source_code, + args=self.args, + json_schema=self.tool.json_schema, + ) + def initialize_param(self, name: str, raw_value: JsonValue) -> str: """ Produce code for initializing a single parameter in the generated script. diff --git a/letta/services/tool_sandbox/e2b_sandbox.py b/letta/services/tool_sandbox/e2b_sandbox.py index 8b8c56f5..1834d9ad 100644 --- a/letta/services/tool_sandbox/e2b_sandbox.py +++ b/letta/services/tool_sandbox/e2b_sandbox.py @@ -14,6 +14,7 @@ from letta.schemas.tool import Tool from letta.schemas.tool_execution_result import ToolExecutionResult from letta.services.helpers.tool_parser_helper import parse_stdout_best_effort from letta.services.tool_sandbox.base import AsyncToolSandboxBase +from letta.services.tool_sandbox.typescript_generator import parse_typescript_result from letta.types import JsonDict from letta.utils import get_friendly_error_msg @@ -80,15 +81,25 @@ class AsyncToolSandboxE2B(AsyncToolSandboxBase): envs = await self._gather_env_vars(agent_state, additional_env_vars, sbx_config.id, is_local=False) code = await self.generate_execution_script(agent_state=agent_state) + # Determine language based on tool source_type + is_typescript = self.is_typescript_tool() + language = "ts" if is_typescript else None + try: - logger.info(f"E2B execution started for ID {e2b_sandbox.sandbox_id}: {self.tool_name}") + logger.info(f"E2B execution started for ID {e2b_sandbox.sandbox_id}: {self.tool_name} (language={language or 'python'})") log_event( "e2b_execution_started", - {"tool": self.tool_name, "sandbox_id": e2b_sandbox.sandbox_id, "code": code, "env_vars": envs}, + { + "tool": self.tool_name, + "sandbox_id": e2b_sandbox.sandbox_id, + "code": code, + "env_vars": envs, + "language": language or "python", + }, ) start_time = time.perf_counter() try: - execution = await e2b_sandbox.run_code(code, envs=envs) + execution = await e2b_sandbox.run_code(code, envs=envs, language=language) except asyncio.CancelledError: execution_time = time.perf_counter() - start_time logger.info(f"E2B execution cancelled for ID {e2b_sandbox.sandbox_id}: {self.tool_name} (took {execution_time:.2f}s)") @@ -102,7 +113,11 @@ class AsyncToolSandboxE2B(AsyncToolSandboxBase): logger.info(f"E2B execution completed in {execution_time:.2f}s for sandbox {e2b_sandbox.sandbox_id}, tool: {self.tool_name}") if execution.results: - func_return, agent_state = parse_stdout_best_effort(execution.results[0].text) + # Use appropriate result parser based on language + if is_typescript: + func_return, agent_state = parse_typescript_result(execution.results[0].text) + else: + func_return, agent_state = parse_stdout_best_effort(execution.results[0].text) logger.info(f"E2B execution succeeded for ID {e2b_sandbox.sandbox_id}: {self.tool_name} (took {execution_time:.2f}s)") log_event( "e2b_execution_succeeded", @@ -257,8 +272,82 @@ class AsyncToolSandboxE2B(AsyncToolSandboxBase): "error": str(e), }, ) + # Kill sandbox to prevent resource leak + await sbx.kill() raise RuntimeError(error_msg) from e + # Install tool-specific npm requirements (for TypeScript tools) + if self.tool and self.tool.npm_requirements: + for npm_requirement in self.tool.npm_requirements: + package_str = str(npm_requirement) + log_event( + "tool_npm_install_started", + { + "sandbox_id": sbx.sandbox_id, + "package": package_str, + "tool_name": self.tool.name, + }, + ) + try: + await sbx.commands.run(f"npm install {package_str}") + log_event( + "tool_npm_install_finished", + { + "sandbox_id": sbx.sandbox_id, + "package": package_str, + "tool_name": self.tool.name, + }, + ) + except CommandExitException as e: + error_msg = f"Failed to install tool npm requirement '{package_str}' for tool '{self.tool.name}' in E2B sandbox. This may be due to package version incompatibility. Error: {e}" + logger.error(error_msg) + log_event( + "tool_npm_install_failed", + { + "sandbox_id": sbx.sandbox_id, + "package": package_str, + "tool_name": self.tool.name, + "error": str(e), + }, + ) + # Kill sandbox to prevent resource leak + await sbx.kill() + raise RuntimeError(error_msg) from e + + # Auto-install @letta-ai/letta-client for TypeScript tools (similar to letta_client for Python) + if self.is_typescript_tool(): + letta_client_package = "@letta-ai/letta-client" + log_event( + "letta_client_npm_install_started", + { + "sandbox_id": sbx.sandbox_id, + "package": letta_client_package, + "tool_name": self.tool.name if self.tool else None, + }, + ) + try: + await sbx.commands.run(f"npm install {letta_client_package}") + log_event( + "letta_client_npm_install_finished", + { + "sandbox_id": sbx.sandbox_id, + "package": letta_client_package, + "tool_name": self.tool.name if self.tool else None, + }, + ) + except CommandExitException as e: + # Log warning but don't fail - the client is optional for tools that don't need it + logger.warning(f"Failed to install {letta_client_package} in E2B sandbox: {e}") + log_event( + "letta_client_npm_install_failed", + { + "sandbox_id": sbx.sandbox_id, + "package": letta_client_package, + "tool_name": self.tool.name if self.tool else None, + "error": str(e), + }, + ) + return sbx def use_top_level_await(self) -> bool: diff --git a/letta/services/tool_sandbox/typescript_generator.py b/letta/services/tool_sandbox/typescript_generator.py new file mode 100644 index 00000000..5370b925 --- /dev/null +++ b/letta/services/tool_sandbox/typescript_generator.py @@ -0,0 +1,236 @@ +"""TypeScript execution script generator for sandbox execution.""" + +import json +import re +from typing import Any, Dict, Optional + +from letta.types import JsonDict, JsonValue + + +def convert_param_to_ts_value(param_type: Optional[str], raw_value: JsonValue) -> str: + """ + Convert parameter to TypeScript code representation based on JSON schema type. + + Args: + param_type: The JSON schema type (string, number, integer, boolean, array, object) + raw_value: The raw value to convert + + Returns: + A string representation of the value in TypeScript syntax + """ + # Handle null values first - return TypeScript null (not Python's "None") + if raw_value is None: + return "null" + + if param_type == "string": + # Use JSON.stringify for proper string escaping + return json.dumps(raw_value) + if param_type in ("number", "integer"): + return str(raw_value) + if param_type == "boolean": + if isinstance(raw_value, bool): + return "true" if raw_value else "false" + if isinstance(raw_value, int) and raw_value in (0, 1): + return "true" if raw_value else "false" + if isinstance(raw_value, str) and raw_value.strip().lower() in ("true", "false"): + return raw_value.strip().lower() + raise ValueError(f"Invalid boolean value: {raw_value}") + if param_type in ("array", "object"): + return json.dumps(raw_value) + # Default: use JSON serialization + return json.dumps(raw_value) + + +def extract_typescript_function_name(source_code: str) -> Optional[str]: + """ + Extract the exported function name from TypeScript source code. + + Args: + source_code: TypeScript source code + + Returns: + The function name if found, None otherwise + """ + # Match both regular and async exported functions + patterns = [ + r"export\s+function\s+(\w+)", + r"export\s+async\s+function\s+(\w+)", + ] + + for pattern in patterns: + match = re.search(pattern, source_code) + if match: + return match.group(1) + + return None + + +def is_async_typescript_function(source_code: str, function_name: str) -> bool: + """ + Detect if a TypeScript function is async. + + Args: + source_code: TypeScript source code + function_name: The function name to check + + Returns: + True if the function is async, False otherwise + """ + # Match async function declaration: export async function foo + pattern1 = rf"export\s+async\s+function\s+{re.escape(function_name)}" + if re.search(pattern1, source_code): + return True + + # Match async arrow function: export const foo = async + pattern2 = rf"export\s+const\s+{re.escape(function_name)}\s*=\s*async" + if re.search(pattern2, source_code): + return True + + return False + + +def generate_typescript_execution_script( + tool_name: str, + tool_source_code: str, + args: JsonDict, + json_schema: Dict[str, Any], + env_vars_to_inject: Optional[Dict[str, str]] = None, +) -> str: + """ + Generate a TypeScript execution script for running a tool in E2B sandbox. + + The generated script: + 1. Imports and initializes the Letta client (available as `client` variable) + 2. Initializes arguments as TypeScript constants + 3. Includes the user's tool source code + 4. Calls the function and serializes the result as JSON + + Note: TypeScript tools do NOT support agent_state injection (legacy Python feature). + The agent_id is available via process.env.LETTA_AGENT_ID environment variable. + + Args: + tool_name: Name of the tool function + tool_source_code: The TypeScript source code of the tool + args: Arguments to pass to the function + json_schema: JSON schema describing the function parameters + env_vars_to_inject: Optional environment variables to inject + + Returns: + Generated TypeScript code ready for execution + """ + lines: list[str] = [] + + # Extract user's import statements - they must be at the top of the file for ESM + import_pattern = r"^import\s+.+?['\"];?\s*$" + user_imports = re.findall(import_pattern, tool_source_code, re.MULTILINE) + source_without_imports = re.sub(import_pattern, "", tool_source_code, flags=re.MULTILINE) + + # Add user imports at the very top (ESM requires imports at top of file) + if user_imports: + for imp in user_imports: + lines.append(imp.strip()) + lines.append("") + + # Import and initialize Letta client (similar to Python's letta_client injection) + # The client is available as `client` variable in the tool's scope + # Use dynamic import with try/catch to gracefully handle missing package + lines.extend( + [ + "// Initialize Letta client for TypeScript tool execution", + "let client: any = null;", + "try {", + " const { LettaClient } = await import('@letta-ai/letta-client');", + " const apiKey = process.env.LETTA_API_KEY;", + " if (apiKey) {", + " client = new LettaClient({ apiKey });", + " }", + "} catch (e) {", + " // Package not available - client remains null", + "}", + "", + ] + ) + + # Initialize arguments + # Handle null json_schema (can happen if ToolUpdate sets source_type without schema) + if json_schema is None: + json_schema = {} + properties = json_schema.get("parameters", {}).get("properties", {}) + for param_name, param_value in args.items(): + param_spec = properties.get(param_name, {}) + param_type = param_spec.get("type") + ts_value = convert_param_to_ts_value(param_type, param_value) + lines.append(f"const {param_name} = {ts_value};") + + if args: + lines.append("") + + # Add the user's source code (imports already extracted), stripping 'export' keywords + stripped_source = re.sub(r"\bexport\s+", "", source_without_imports) + lines.append(stripped_source.strip()) + lines.append("") + + # Detect if function is async + is_async = is_async_typescript_function(tool_source_code, tool_name) + + # Generate function call with arguments in correct order + # Use the order from json_schema (required + optional) to ensure positional args are correct + parameters = json_schema.get("parameters", {}) + required_params = parameters.get("required", []) + schema_properties = parameters.get("properties", {}) + + # Build ordered param list: required params first (in order), then any remaining args + ordered_params = [] + for param in required_params: + if param in args: + ordered_params.append(param) + # Add any remaining params that weren't in required (optional params) + for param in schema_properties.keys(): + if param in args and param not in ordered_params: + ordered_params.append(param) + # Fallback: add any args not in schema (shouldn't happen, but be safe) + for param in args.keys(): + if param not in ordered_params: + ordered_params.append(param) + + params_str = ", ".join(ordered_params) + func_call = f"{tool_name}({params_str})" + + # Execute the function and output result as JSON + # E2B supports top-level await for TypeScript + if is_async: + lines.append(f"const _result = await {func_call};") + else: + lines.append(f"const _result = {func_call};") + + # Serialize the result - we use JSON for TypeScript (not pickle like Python) + # The output format matches what the Python sandbox expects + # Note: agent_state is always null for TypeScript tools (not supported) + lines.append("const _output = { results: _result, agent_state: null };") + lines.append("JSON.stringify(_output);") + + return "\n".join(lines) + "\n" + + +def parse_typescript_result(result_text: str) -> tuple[Any, None]: + """ + Parse the result from TypeScript tool execution. + + TypeScript tools return JSON-serialized results instead of pickle. + + Args: + result_text: The JSON string output from the TypeScript execution + + Returns: + Tuple of (function_return_value, agent_state) + Note: agent_state is always None for TypeScript tools + """ + if not result_text: + return None, None + + try: + result = json.loads(result_text) + return result.get("results"), result.get("agent_state") + except json.JSONDecodeError: + # If JSON parsing fails, return the raw text + return result_text, None diff --git a/tests/integration_test_typescript_tool_execution_sandbox.py b/tests/integration_test_typescript_tool_execution_sandbox.py new file mode 100644 index 00000000..97528214 --- /dev/null +++ b/tests/integration_test_typescript_tool_execution_sandbox.py @@ -0,0 +1,666 @@ +"""Integration tests for TypeScript tool execution in E2B sandbox.""" + +import uuid + +import pytest +from sqlalchemy import delete + +from letta.config import LettaConfig +from letta.orm.sandbox_config import SandboxConfig, SandboxEnvironmentVariable +from letta.schemas.enums import ToolType +from letta.schemas.npm_requirement import NpmRequirement +from letta.schemas.organization import Organization +from letta.schemas.tool import Tool as PydanticTool, ToolCreate +from letta.schemas.user import User +from letta.server.server import SyncServer +from letta.services.organization_manager import OrganizationManager +from letta.services.tool_executor.tool_execution_sandbox import ToolExecutionSandbox +from letta.services.tool_manager import ToolManager +from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B +from letta.services.user_manager import UserManager + +# Constants +namespace = uuid.NAMESPACE_DNS +org_name = str(uuid.uuid5(namespace, "test-typescript-tool-execution-org")) +user_name = str(uuid.uuid5(namespace, "test-typescript-tool-execution-user")) + + +# Fixtures +@pytest.fixture(scope="module") +def server(): + """Creates a SyncServer instance for testing.""" + config = LettaConfig.load() + config.save() + server = SyncServer(init_with_default_org_and_user=True) + yield server + + +@pytest.fixture(autouse=True) +async def clear_tables(): + """Fixture to clear sandbox tables before each test.""" + from letta.server.db import db_registry + + async with db_registry.async_session() as session: + await session.execute(delete(SandboxEnvironmentVariable)) + await session.execute(delete(SandboxConfig)) + + +@pytest.fixture +async def test_organization(): + """Fixture to create and return the default organization.""" + org = await OrganizationManager().create_organization_async(Organization(name=org_name)) + yield org + + +@pytest.fixture +async def test_user(test_organization): + """Fixture to create and return the default user within the default organization.""" + user = await UserManager().create_actor_async(User(name=user_name, organization_id=test_organization.id)) + yield user + + +# TypeScript Tool Fixtures + + +@pytest.fixture +async def add_numbers_ts_tool(test_user): + """Simple TypeScript tool that adds two numbers.""" + tool = PydanticTool( + name="add_numbers", + description="Add two numbers together", + source_code=""" +export function add_numbers(x: number, y: number): number { + return x + y; +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "add_numbers", + "description": "Add two numbers together", + "parameters": { + "type": "object", + "properties": { + "x": {"type": "number", "description": "First number"}, + "y": {"type": "number", "description": "Second number"}, + }, + "required": ["x", "y"], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + yield tool + + +@pytest.fixture +async def string_concat_ts_tool(test_user): + """TypeScript tool that concatenates strings.""" + tool = PydanticTool( + name="concat_strings", + description="Concatenate two strings", + source_code=""" +export function concat_strings(a: string, b: string): string { + return a + b; +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "concat_strings", + "description": "Concatenate two strings", + "parameters": { + "type": "object", + "properties": { + "a": {"type": "string", "description": "First string"}, + "b": {"type": "string", "description": "Second string"}, + }, + "required": ["a", "b"], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + yield tool + + +@pytest.fixture +async def async_ts_tool(test_user): + """Async TypeScript tool.""" + tool = PydanticTool( + name="async_delay", + description="An async function that returns after a small delay", + source_code=""" +export async function async_delay(message: string): Promise { + await new Promise(resolve => setTimeout(resolve, 100)); + return `Delayed: ${message}`; +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "async_delay", + "description": "An async function that returns after a small delay", + "parameters": { + "type": "object", + "properties": { + "message": {"type": "string", "description": "Message to return"}, + }, + "required": ["message"], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + yield tool + + +@pytest.fixture +async def error_ts_tool(test_user): + """TypeScript tool that throws an error.""" + tool = PydanticTool( + name="throw_error", + description="A function that always throws an error", + source_code=""" +export function throw_error(): never { + throw new Error("This is an intentional test error"); +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "throw_error", + "description": "A function that always throws an error", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + yield tool + + +@pytest.fixture +async def array_ts_tool(test_user): + """TypeScript tool that works with arrays.""" + tool = PydanticTool( + name="sum_array", + description="Sum all numbers in an array", + source_code=""" +export function sum_array(numbers: number[]): number { + return numbers.reduce((acc, curr) => acc + curr, 0); +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "sum_array", + "description": "Sum all numbers in an array", + "parameters": { + "type": "object", + "properties": { + "numbers": { + "type": "array", + "items": {"type": "number"}, + "description": "Array of numbers to sum", + }, + }, + "required": ["numbers"], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + yield tool + + +@pytest.fixture +async def object_ts_tool(test_user): + """TypeScript tool that works with objects.""" + tool = PydanticTool( + name="get_name", + description="Extract name from a person object", + source_code=""" +export function get_name(person: { firstName: string; lastName: string }): string { + return `${person.firstName} ${person.lastName}`; +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "get_name", + "description": "Extract name from a person object", + "parameters": { + "type": "object", + "properties": { + "person": { + "type": "object", + "properties": { + "firstName": {"type": "string"}, + "lastName": {"type": "string"}, + }, + "required": ["firstName", "lastName"], + "description": "Person object with name fields", + }, + }, + "required": ["person"], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + yield tool + + +# Tests + + +class TestTypescriptToolValidation: + """Tests for TypeScript tool validation.""" + + def test_typescript_tool_requires_json_schema(self): + """Test that TypeScript tools require explicit json_schema.""" + with pytest.raises(ValueError, match="TypeScript tools require an explicit json_schema"): + ToolCreate( + source_code='export function test(): string { return "hello"; }', + source_type="typescript", + # Deliberately not providing json_schema + ) + + def test_typescript_tool_with_schema_is_valid(self, test_user): + """Test that TypeScript tools with json_schema are valid.""" + tool_create = ToolCreate( + source_code='export function test(): string { return "hello"; }', + source_type="typescript", + json_schema={ + "name": "test", + "description": "Test function", + "parameters": {"type": "object", "properties": {}, "required": []}, + }, + ) + assert tool_create.source_type == "typescript" + assert tool_create.json_schema is not None + + def test_python_tool_without_schema_is_valid(self): + """Test that Python tools can still be created without explicit json_schema.""" + tool_create = ToolCreate( + source_code='def test(): return "hello"', + source_type="python", + # No json_schema - should be auto-generated for Python + ) + assert tool_create.source_type == "python" + + @pytest.mark.asyncio + async def test_typescript_tool_does_not_inject_agent_state(self, test_user): + """Test that TypeScript tools do not support agent_state injection (legacy Python feature).""" + # Create a TypeScript tool that has 'agent_state' in its parameters + # (this shouldn't happen in practice, but we test the sandbox behavior) + tool = PydanticTool( + name="test_no_agent_state", + description="Test tool", + source_code=""" +export function test_no_agent_state(x: number): number { + return x; +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "test_no_agent_state", + "description": "Test tool", + "parameters": { + "type": "object", + "properties": { + "x": {"type": "number"}, + }, + "required": ["x"], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + + sandbox = AsyncToolSandboxE2B( + tool_name=tool.name, + args={"x": 42}, + user=test_user, + tool_id=tool.id, + tool_object=tool, + ) + + # Initialize the sandbox to trigger the _init_async method + await sandbox._init_async() + + # Verify agent_state injection is disabled for TypeScript tools + assert sandbox.inject_agent_state is False + + +@pytest.mark.e2b_sandbox +class TestTypescriptToolExecution: + """Tests for TypeScript tool execution in E2B sandbox.""" + + @pytest.mark.asyncio + async def test_e2b_typescript_add_numbers(self, check_e2b_key_is_set, add_numbers_ts_tool, test_user): + """Test basic TypeScript tool execution with number arguments.""" + sandbox = AsyncToolSandboxE2B( + tool_name=add_numbers_ts_tool.name, + args={"x": 10, "y": 5}, + user=test_user, + tool_id=add_numbers_ts_tool.id, + tool_object=add_numbers_ts_tool, + ) + result = await sandbox.run() + + assert result.status == "success" + assert result.func_return == 15 + + @pytest.mark.asyncio + async def test_e2b_typescript_string_concat(self, check_e2b_key_is_set, string_concat_ts_tool, test_user): + """Test TypeScript tool execution with string arguments.""" + sandbox = AsyncToolSandboxE2B( + tool_name=string_concat_ts_tool.name, + args={"a": "Hello, ", "b": "World!"}, + user=test_user, + tool_id=string_concat_ts_tool.id, + tool_object=string_concat_ts_tool, + ) + result = await sandbox.run() + + assert result.status == "success" + assert result.func_return == "Hello, World!" + + @pytest.mark.asyncio + async def test_e2b_typescript_async_function(self, check_e2b_key_is_set, async_ts_tool, test_user): + """Test async TypeScript tool execution.""" + sandbox = AsyncToolSandboxE2B( + tool_name=async_ts_tool.name, + args={"message": "test"}, + user=test_user, + tool_id=async_ts_tool.id, + tool_object=async_ts_tool, + ) + result = await sandbox.run() + + assert result.status == "success" + assert result.func_return == "Delayed: test" + + @pytest.mark.asyncio + async def test_e2b_typescript_error_handling(self, check_e2b_key_is_set, error_ts_tool, test_user): + """Test TypeScript tool error handling.""" + sandbox = AsyncToolSandboxE2B( + tool_name=error_ts_tool.name, + args={}, + user=test_user, + tool_id=error_ts_tool.id, + tool_object=error_ts_tool, + ) + result = await sandbox.run() + + assert result.status == "error" + assert "error" in result.func_return.lower() or "Error" in str(result.stderr) + + @pytest.mark.asyncio + async def test_e2b_typescript_array_argument(self, check_e2b_key_is_set, array_ts_tool, test_user): + """Test TypeScript tool with array argument.""" + sandbox = AsyncToolSandboxE2B( + tool_name=array_ts_tool.name, + args={"numbers": [1, 2, 3, 4, 5]}, + user=test_user, + tool_id=array_ts_tool.id, + tool_object=array_ts_tool, + ) + result = await sandbox.run() + + assert result.status == "success" + assert result.func_return == 15 + + @pytest.mark.asyncio + async def test_e2b_typescript_object_argument(self, check_e2b_key_is_set, object_ts_tool, test_user): + """Test TypeScript tool with object argument.""" + sandbox = AsyncToolSandboxE2B( + tool_name=object_ts_tool.name, + args={"person": {"firstName": "John", "lastName": "Doe"}}, + user=test_user, + tool_id=object_ts_tool.id, + tool_object=object_ts_tool, + ) + result = await sandbox.run() + + assert result.status == "success" + assert result.func_return == "John Doe" + + +@pytest.mark.e2b_sandbox +class TestTypescriptToolWithLettaClient: + """Tests for TypeScript tools with Letta client integration.""" + + @pytest.mark.asyncio + async def test_e2b_typescript_letta_client_available(self, check_e2b_key_is_set, test_user): + """Test that the Letta client is available in TypeScript sandbox (as null when no API key).""" + # Create a tool that checks if the client variable exists + tool = PydanticTool( + name="check_client", + description="Check if Letta client is available", + source_code=""" +export function check_client(): string { + // client is injected by the sandbox - it will be null if no API key + return client === null ? "client is null (no API key)" : "client is available"; +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "check_client", + "description": "Check if Letta client is available", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + + sandbox = AsyncToolSandboxE2B( + tool_name=tool.name, + args={}, + user=test_user, + tool_id=tool.id, + tool_object=tool, + ) + result = await sandbox.run() + + assert result.status == "success" + # Without LETTA_API_KEY, client should be null + assert "client" in result.func_return.lower() + + +@pytest.mark.e2b_sandbox +class TestTypescriptToolWithNpmPackages: + """Tests for TypeScript tools with npm package dependencies.""" + + @pytest.mark.asyncio + async def test_e2b_typescript_with_npm_package(self, check_e2b_key_is_set, test_user): + """Test TypeScript tool execution with npm package dependency.""" + # Create a tool that uses the 'lodash' npm package + tool = PydanticTool( + name="lodash_capitalize", + description="Capitalize a string using lodash", + source_code=""" +import _ from 'lodash'; + +export function lodash_capitalize(text: string): string { + return _.capitalize(text); +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + npm_requirements=[NpmRequirement(name="lodash"), NpmRequirement(name="@types/lodash")], + json_schema={ + "name": "lodash_capitalize", + "description": "Capitalize a string using lodash", + "parameters": { + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to capitalize"}, + }, + "required": ["text"], + }, + }, + ) + tool = await ToolManager().create_or_update_tool_async(tool, test_user) + + sandbox = AsyncToolSandboxE2B( + tool_name=tool.name, + args={"text": "hello world"}, + user=test_user, + tool_id=tool.id, + tool_object=tool, + ) + result = await sandbox.run() + + assert result.status == "success" + assert result.func_return == "Hello world" + + +class TestTypescriptGeneratorUnit: + """Unit tests for TypeScript code generation.""" + + def test_convert_param_to_ts_value_string(self): + """Test string parameter conversion.""" + from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value + + assert convert_param_to_ts_value("string", "hello") == '"hello"' + assert convert_param_to_ts_value("string", 'hello "world"') == '"hello \\"world\\""' + + def test_convert_param_to_ts_value_number(self): + """Test number parameter conversion.""" + from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value + + assert convert_param_to_ts_value("number", 42) == "42" + assert convert_param_to_ts_value("number", 3.14) == "3.14" + assert convert_param_to_ts_value("integer", 100) == "100" + + def test_convert_param_to_ts_value_boolean(self): + """Test boolean parameter conversion.""" + from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value + + assert convert_param_to_ts_value("boolean", True) == "true" + assert convert_param_to_ts_value("boolean", False) == "false" + + def test_convert_param_to_ts_value_array(self): + """Test array parameter conversion.""" + from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value + + assert convert_param_to_ts_value("array", [1, 2, 3]) == "[1, 2, 3]" + + def test_convert_param_to_ts_value_object(self): + """Test object parameter conversion.""" + from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value + + result = convert_param_to_ts_value("object", {"key": "value"}) + assert result == '{"key": "value"}' + + def test_extract_typescript_function_name(self): + """Test TypeScript function name extraction.""" + from letta.services.tool_sandbox.typescript_generator import extract_typescript_function_name + + assert extract_typescript_function_name("export function myFunc(): void {}") == "myFunc" + assert extract_typescript_function_name("export async function asyncFunc(): Promise {}") == "asyncFunc" + assert extract_typescript_function_name("function notExported(): void {}") is None + + def test_is_async_typescript_function(self): + """Test async function detection.""" + from letta.services.tool_sandbox.typescript_generator import is_async_typescript_function + + assert is_async_typescript_function("export async function test(): Promise {}", "test") is True + assert is_async_typescript_function("export function test(): void {}", "test") is False + + def test_generate_typescript_execution_script(self): + """Test TypeScript execution script generation.""" + from letta.services.tool_sandbox.typescript_generator import generate_typescript_execution_script + + script = generate_typescript_execution_script( + tool_name="add", + tool_source_code="export function add(x: number, y: number): number { return x + y; }", + args={"x": 1, "y": 2}, + json_schema={ + "name": "add", + "parameters": { + "type": "object", + "properties": { + "x": {"type": "number"}, + "y": {"type": "number"}, + }, + }, + }, + ) + + # Verify Letta client initialization is included (using dynamic import with try/catch) + assert "let client: any = null;" in script + assert "await import('@letta-ai/letta-client')" in script + assert "process.env.LETTA_API_KEY" in script + assert "} catch (e) {" in script # Graceful fallback if package not available + + # Verify arguments are initialized + assert "const x = 1;" in script + assert "const y = 2;" in script + assert "function add" in script # 'export' is stripped for inline execution + assert "const _result = add(x, y);" in script + assert "JSON.stringify(_output);" in script + + # Verify agent_state is null (not supported for TypeScript) + assert "agent_state: null" in script + + def test_parse_typescript_result(self): + """Test TypeScript result parsing.""" + from letta.services.tool_sandbox.typescript_generator import parse_typescript_result + + # Valid JSON result + result, agent_state = parse_typescript_result('{"results": 42, "agent_state": null}') + assert result == 42 + assert agent_state is None + + # Invalid JSON returns raw text + result, agent_state = parse_typescript_result("not json") + assert result == "not json" + assert agent_state is None + + # Empty result + result, agent_state = parse_typescript_result("") + assert result is None + assert agent_state is None + + def test_sandbox_tool_executor_skips_ast_for_typescript(self): + """Test that SandboxToolExecutor._prepare_function_args skips AST parsing for TypeScript.""" + from letta.schemas.tool import Tool as PydanticTool + from letta.services.tool_executor.sandbox_tool_executor import SandboxToolExecutor + + ts_tool = PydanticTool( + name="ts_func", + description="Test TypeScript tool", + source_code=""" +export function ts_func(a: number, b: string): string { + return b.repeat(a); +} +""", + source_type="typescript", + tool_type=ToolType.CUSTOM, + json_schema={ + "name": "ts_func", + "parameters": { + "type": "object", + "properties": { + "a": {"type": "number"}, + "b": {"type": "string"}, + }, + "required": ["a", "b"], + }, + }, + ) + + # This should NOT raise a SyntaxError - it should skip AST parsing for TypeScript + result = SandboxToolExecutor._prepare_function_args( + function_args={"a": 3, "b": "test"}, + tool=ts_tool, + function_name="ts_func", + ) + + # Should return original args unchanged (no type coercion for TypeScript) + assert result == {"a": 3, "b": "test"}