feat: add TypeScript tool support for E2B sandbox execution (#8796)

* feat: add TypeScript tool support for E2B sandbox execution

This change implements TypeScript tool support using the same E2B path as Python tools:

- Add TypeScript execution script generator (typescript_generator.py)
- Modify E2B sandbox to detect TypeScript tools and use language='ts'
- Add npm package installation for TypeScript tool dependencies
- Add validation requiring json_schema for TypeScript tools
- Add comprehensive integration tests for TypeScript tools

TypeScript tools:
- Require explicit json_schema (no docstring parsing)
- Use JSON serialization instead of pickle for results
- Support async functions with top-level await
- Support npm package dependencies via npm_requirements field

Closes #8793

Co-authored-by: Sarah Wooders <sarahwooders@users.noreply.github.com>

* fix: disable AgentState for TypeScript tools & add letta-client injection

Based on Sarah's feedback:
1. AgentState is a legacy Python-only feature, disabled for TS tools
2. Added @letta-ai/letta-client npm package injection for TypeScript
   (similar to letta_client for Python)

Changes:
- base.py: Explicitly set inject_agent_state=False for TypeScript tools
- typescript_generator.py: Inject LettaClient initialization code
- e2b_sandbox.py: Auto-install @letta-ai/letta-client for TS tools
- Added tests verifying both behaviors

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Sarah Wooders <sarahwooders@users.noreply.github.com>
Co-Authored-By: Letta <noreply@letta.com>

* Update core-integration-tests.yml

* fix: convert TypeScript test fixtures to async

The OrganizationManager and UserManager no longer have sync methods,
only async variants. Updated all fixtures to use:
- create_organization_async
- create_actor_async
- create_or_update_tool_async

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: skip Python AST parsing for TypeScript tools in sandbox base

The _init_async method was calling parse_function_arguments (which uses
Python's ast.parse) before checking if the tool was TypeScript, causing
SyntaxError when running TypeScript tools.

Moved the is_typescript_tool() check to happen first, skipping Python
AST parsing entirely for TypeScript tools.

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* letta_agent_id

* skip ast parsing for s

* add tool execution test

---------

Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com>
Co-authored-by: Sarah Wooders <sarahwooders@users.noreply.github.com>
Co-authored-by: Letta <noreply@letta.com>
Co-authored-by: Kian Jones <kian@letta.com>
This commit is contained in:
Sarah Wooders
2026-01-16 19:25:30 -08:00
parent 5c7bed7743
commit 7c1da1e9e2
6 changed files with 1062 additions and 14 deletions

View File

@@ -129,6 +129,19 @@ class ToolCreate(LettaBase):
False, description="If set to True, then this tool will potentially be executed concurrently with other tools. Default False."
)
@model_validator(mode="after")
def validate_typescript_requires_schema(self):
"""
TypeScript tools require an explicit json_schema since we don't support
docstring parsing for TypeScript.
"""
if self.source_type == "typescript" and not self.json_schema:
raise ValueError(
"TypeScript tools require an explicit json_schema parameter. "
"Unlike Python tools, schema cannot be auto-generated from TypeScript source code."
)
return self
@classmethod
def from_mcp(cls, mcp_server_name: str, mcp_tool: MCPTool) -> "ToolCreate":
from letta.functions.helpers import generate_mcp_tool_wrapper

View File

@@ -149,8 +149,12 @@ class SandboxToolExecutor(ToolExecutor):
@staticmethod
def _prepare_function_args(function_args: JsonDict, tool: Tool, function_name: str) -> dict:
"""Prepare function arguments with proper type coercion."""
# Skip Python AST parsing for TypeScript tools - they use json_schema for type info
if tool.source_type == "typescript":
return function_args
try:
# Parse the source code to extract function annotations
# Parse the source code to extract function annotations (Python only)
annotations = get_function_annotations_from_source(tool.source_code, function_name)
# Coerce the function arguments to the correct types based on the annotations
return coerce_dict_args_by_annotations(function_args, annotations)

View File

@@ -7,6 +7,7 @@ from typing import Any, Dict, Optional
from letta.functions.helpers import generate_model_from_args_json_schema
from letta.otel.tracing import trace_method
from letta.schemas.agent import AgentState
from letta.schemas.enums import ToolSourceType
from letta.schemas.sandbox_config import SandboxConfig
from letta.schemas.tool import Tool
from letta.schemas.tool_execution_result import ToolExecutionResult
@@ -63,18 +64,28 @@ class AsyncToolSandboxBase(ABC):
f"Agent attempted to invoke tool {self.tool_name} that does not exist for organization {self.user.organization_id}"
)
# Check for reserved keyword arguments
tool_arguments = parse_function_arguments(self.tool.source_code, self.tool.name)
# TODO: deprecate this
if "agent_state" in tool_arguments:
self.inject_agent_state = True
else:
# TypeScript tools do not support agent_state or agent_id injection as function params
# (these are Python-only features). Instead, agent_id is exposed via LETTA_AGENT_ID env var.
if self.is_typescript_tool():
self.inject_agent_state = False
self.inject_agent_id = False
else:
# Check for reserved keyword arguments (Python tools only)
tool_arguments = parse_function_arguments(self.tool.source_code, self.tool.name)
# TODO: deprecate this
# Note: AgentState injection is a legacy feature for Python tools only.
if "agent_state" in tool_arguments:
self.inject_agent_state = True
else:
self.inject_agent_state = False
self.inject_agent_id = "agent_id" in tool_arguments
# Always inject Letta client (available as `client` variable in sandbox)
# For Python: letta_client package
# For TypeScript: @letta-ai/letta-client package
self.inject_letta_client = True
self.inject_agent_id = "agent_id" in tool_arguments
self.is_async_function = self._detect_async_function()
self._initialized = True
@@ -98,13 +109,23 @@ class AsyncToolSandboxBase(ABC):
"""
raise NotImplementedError
def is_typescript_tool(self) -> bool:
"""Check if the tool is a TypeScript tool based on source_type."""
if self.tool and self.tool.source_type:
return self.tool.source_type == ToolSourceType.typescript or self.tool.source_type == "typescript"
return False
@trace_method
async def generate_execution_script(self, agent_state: Optional[AgentState], wrap_print_with_markers: bool = False) -> str:
"""
Generate code to run inside of execution sandbox. Serialize the agent state and arguments, call the tool,
then base64-encode/pickle the result. Constructs the python file.
then base64-encode/pickle the result. Constructs the python file (or TypeScript for TS tools).
"""
await self._init_async()
# Route to TypeScript generator for TypeScript tools
if self.is_typescript_tool():
return await self._generate_typescript_execution_script(agent_state)
future_import = False
schema_code = None
@@ -321,6 +342,25 @@ class AsyncToolSandboxBase(ABC):
return "\n".join(lines) + "\n"
async def _generate_typescript_execution_script(self, agent_state: Optional[AgentState]) -> str:
"""
Generate TypeScript code to run inside of execution sandbox.
TypeScript tools:
- Do NOT support agent_state injection (stateless)
- agent_id is available via process.env.LETTA_AGENT_ID
- Return JSON-serialized results instead of pickle
- Require explicit json_schema (no docstring parsing)
"""
from letta.services.tool_sandbox.typescript_generator import generate_typescript_execution_script
return generate_typescript_execution_script(
tool_name=self.tool.name,
tool_source_code=self.tool.source_code,
args=self.args,
json_schema=self.tool.json_schema,
)
def initialize_param(self, name: str, raw_value: JsonValue) -> str:
"""
Produce code for initializing a single parameter in the generated script.

View File

@@ -14,6 +14,7 @@ from letta.schemas.tool import Tool
from letta.schemas.tool_execution_result import ToolExecutionResult
from letta.services.helpers.tool_parser_helper import parse_stdout_best_effort
from letta.services.tool_sandbox.base import AsyncToolSandboxBase
from letta.services.tool_sandbox.typescript_generator import parse_typescript_result
from letta.types import JsonDict
from letta.utils import get_friendly_error_msg
@@ -80,15 +81,25 @@ class AsyncToolSandboxE2B(AsyncToolSandboxBase):
envs = await self._gather_env_vars(agent_state, additional_env_vars, sbx_config.id, is_local=False)
code = await self.generate_execution_script(agent_state=agent_state)
# Determine language based on tool source_type
is_typescript = self.is_typescript_tool()
language = "ts" if is_typescript else None
try:
logger.info(f"E2B execution started for ID {e2b_sandbox.sandbox_id}: {self.tool_name}")
logger.info(f"E2B execution started for ID {e2b_sandbox.sandbox_id}: {self.tool_name} (language={language or 'python'})")
log_event(
"e2b_execution_started",
{"tool": self.tool_name, "sandbox_id": e2b_sandbox.sandbox_id, "code": code, "env_vars": envs},
{
"tool": self.tool_name,
"sandbox_id": e2b_sandbox.sandbox_id,
"code": code,
"env_vars": envs,
"language": language or "python",
},
)
start_time = time.perf_counter()
try:
execution = await e2b_sandbox.run_code(code, envs=envs)
execution = await e2b_sandbox.run_code(code, envs=envs, language=language)
except asyncio.CancelledError:
execution_time = time.perf_counter() - start_time
logger.info(f"E2B execution cancelled for ID {e2b_sandbox.sandbox_id}: {self.tool_name} (took {execution_time:.2f}s)")
@@ -102,7 +113,11 @@ class AsyncToolSandboxE2B(AsyncToolSandboxBase):
logger.info(f"E2B execution completed in {execution_time:.2f}s for sandbox {e2b_sandbox.sandbox_id}, tool: {self.tool_name}")
if execution.results:
func_return, agent_state = parse_stdout_best_effort(execution.results[0].text)
# Use appropriate result parser based on language
if is_typescript:
func_return, agent_state = parse_typescript_result(execution.results[0].text)
else:
func_return, agent_state = parse_stdout_best_effort(execution.results[0].text)
logger.info(f"E2B execution succeeded for ID {e2b_sandbox.sandbox_id}: {self.tool_name} (took {execution_time:.2f}s)")
log_event(
"e2b_execution_succeeded",
@@ -257,8 +272,82 @@ class AsyncToolSandboxE2B(AsyncToolSandboxBase):
"error": str(e),
},
)
# Kill sandbox to prevent resource leak
await sbx.kill()
raise RuntimeError(error_msg) from e
# Install tool-specific npm requirements (for TypeScript tools)
if self.tool and self.tool.npm_requirements:
for npm_requirement in self.tool.npm_requirements:
package_str = str(npm_requirement)
log_event(
"tool_npm_install_started",
{
"sandbox_id": sbx.sandbox_id,
"package": package_str,
"tool_name": self.tool.name,
},
)
try:
await sbx.commands.run(f"npm install {package_str}")
log_event(
"tool_npm_install_finished",
{
"sandbox_id": sbx.sandbox_id,
"package": package_str,
"tool_name": self.tool.name,
},
)
except CommandExitException as e:
error_msg = f"Failed to install tool npm requirement '{package_str}' for tool '{self.tool.name}' in E2B sandbox. This may be due to package version incompatibility. Error: {e}"
logger.error(error_msg)
log_event(
"tool_npm_install_failed",
{
"sandbox_id": sbx.sandbox_id,
"package": package_str,
"tool_name": self.tool.name,
"error": str(e),
},
)
# Kill sandbox to prevent resource leak
await sbx.kill()
raise RuntimeError(error_msg) from e
# Auto-install @letta-ai/letta-client for TypeScript tools (similar to letta_client for Python)
if self.is_typescript_tool():
letta_client_package = "@letta-ai/letta-client"
log_event(
"letta_client_npm_install_started",
{
"sandbox_id": sbx.sandbox_id,
"package": letta_client_package,
"tool_name": self.tool.name if self.tool else None,
},
)
try:
await sbx.commands.run(f"npm install {letta_client_package}")
log_event(
"letta_client_npm_install_finished",
{
"sandbox_id": sbx.sandbox_id,
"package": letta_client_package,
"tool_name": self.tool.name if self.tool else None,
},
)
except CommandExitException as e:
# Log warning but don't fail - the client is optional for tools that don't need it
logger.warning(f"Failed to install {letta_client_package} in E2B sandbox: {e}")
log_event(
"letta_client_npm_install_failed",
{
"sandbox_id": sbx.sandbox_id,
"package": letta_client_package,
"tool_name": self.tool.name if self.tool else None,
"error": str(e),
},
)
return sbx
def use_top_level_await(self) -> bool:

View File

@@ -0,0 +1,236 @@
"""TypeScript execution script generator for sandbox execution."""
import json
import re
from typing import Any, Dict, Optional
from letta.types import JsonDict, JsonValue
def convert_param_to_ts_value(param_type: Optional[str], raw_value: JsonValue) -> str:
"""
Convert parameter to TypeScript code representation based on JSON schema type.
Args:
param_type: The JSON schema type (string, number, integer, boolean, array, object)
raw_value: The raw value to convert
Returns:
A string representation of the value in TypeScript syntax
"""
# Handle null values first - return TypeScript null (not Python's "None")
if raw_value is None:
return "null"
if param_type == "string":
# Use JSON.stringify for proper string escaping
return json.dumps(raw_value)
if param_type in ("number", "integer"):
return str(raw_value)
if param_type == "boolean":
if isinstance(raw_value, bool):
return "true" if raw_value else "false"
if isinstance(raw_value, int) and raw_value in (0, 1):
return "true" if raw_value else "false"
if isinstance(raw_value, str) and raw_value.strip().lower() in ("true", "false"):
return raw_value.strip().lower()
raise ValueError(f"Invalid boolean value: {raw_value}")
if param_type in ("array", "object"):
return json.dumps(raw_value)
# Default: use JSON serialization
return json.dumps(raw_value)
def extract_typescript_function_name(source_code: str) -> Optional[str]:
"""
Extract the exported function name from TypeScript source code.
Args:
source_code: TypeScript source code
Returns:
The function name if found, None otherwise
"""
# Match both regular and async exported functions
patterns = [
r"export\s+function\s+(\w+)",
r"export\s+async\s+function\s+(\w+)",
]
for pattern in patterns:
match = re.search(pattern, source_code)
if match:
return match.group(1)
return None
def is_async_typescript_function(source_code: str, function_name: str) -> bool:
"""
Detect if a TypeScript function is async.
Args:
source_code: TypeScript source code
function_name: The function name to check
Returns:
True if the function is async, False otherwise
"""
# Match async function declaration: export async function foo
pattern1 = rf"export\s+async\s+function\s+{re.escape(function_name)}"
if re.search(pattern1, source_code):
return True
# Match async arrow function: export const foo = async
pattern2 = rf"export\s+const\s+{re.escape(function_name)}\s*=\s*async"
if re.search(pattern2, source_code):
return True
return False
def generate_typescript_execution_script(
tool_name: str,
tool_source_code: str,
args: JsonDict,
json_schema: Dict[str, Any],
env_vars_to_inject: Optional[Dict[str, str]] = None,
) -> str:
"""
Generate a TypeScript execution script for running a tool in E2B sandbox.
The generated script:
1. Imports and initializes the Letta client (available as `client` variable)
2. Initializes arguments as TypeScript constants
3. Includes the user's tool source code
4. Calls the function and serializes the result as JSON
Note: TypeScript tools do NOT support agent_state injection (legacy Python feature).
The agent_id is available via process.env.LETTA_AGENT_ID environment variable.
Args:
tool_name: Name of the tool function
tool_source_code: The TypeScript source code of the tool
args: Arguments to pass to the function
json_schema: JSON schema describing the function parameters
env_vars_to_inject: Optional environment variables to inject
Returns:
Generated TypeScript code ready for execution
"""
lines: list[str] = []
# Extract user's import statements - they must be at the top of the file for ESM
import_pattern = r"^import\s+.+?['\"];?\s*$"
user_imports = re.findall(import_pattern, tool_source_code, re.MULTILINE)
source_without_imports = re.sub(import_pattern, "", tool_source_code, flags=re.MULTILINE)
# Add user imports at the very top (ESM requires imports at top of file)
if user_imports:
for imp in user_imports:
lines.append(imp.strip())
lines.append("")
# Import and initialize Letta client (similar to Python's letta_client injection)
# The client is available as `client` variable in the tool's scope
# Use dynamic import with try/catch to gracefully handle missing package
lines.extend(
[
"// Initialize Letta client for TypeScript tool execution",
"let client: any = null;",
"try {",
" const { LettaClient } = await import('@letta-ai/letta-client');",
" const apiKey = process.env.LETTA_API_KEY;",
" if (apiKey) {",
" client = new LettaClient({ apiKey });",
" }",
"} catch (e) {",
" // Package not available - client remains null",
"}",
"",
]
)
# Initialize arguments
# Handle null json_schema (can happen if ToolUpdate sets source_type without schema)
if json_schema is None:
json_schema = {}
properties = json_schema.get("parameters", {}).get("properties", {})
for param_name, param_value in args.items():
param_spec = properties.get(param_name, {})
param_type = param_spec.get("type")
ts_value = convert_param_to_ts_value(param_type, param_value)
lines.append(f"const {param_name} = {ts_value};")
if args:
lines.append("")
# Add the user's source code (imports already extracted), stripping 'export' keywords
stripped_source = re.sub(r"\bexport\s+", "", source_without_imports)
lines.append(stripped_source.strip())
lines.append("")
# Detect if function is async
is_async = is_async_typescript_function(tool_source_code, tool_name)
# Generate function call with arguments in correct order
# Use the order from json_schema (required + optional) to ensure positional args are correct
parameters = json_schema.get("parameters", {})
required_params = parameters.get("required", [])
schema_properties = parameters.get("properties", {})
# Build ordered param list: required params first (in order), then any remaining args
ordered_params = []
for param in required_params:
if param in args:
ordered_params.append(param)
# Add any remaining params that weren't in required (optional params)
for param in schema_properties.keys():
if param in args and param not in ordered_params:
ordered_params.append(param)
# Fallback: add any args not in schema (shouldn't happen, but be safe)
for param in args.keys():
if param not in ordered_params:
ordered_params.append(param)
params_str = ", ".join(ordered_params)
func_call = f"{tool_name}({params_str})"
# Execute the function and output result as JSON
# E2B supports top-level await for TypeScript
if is_async:
lines.append(f"const _result = await {func_call};")
else:
lines.append(f"const _result = {func_call};")
# Serialize the result - we use JSON for TypeScript (not pickle like Python)
# The output format matches what the Python sandbox expects
# Note: agent_state is always null for TypeScript tools (not supported)
lines.append("const _output = { results: _result, agent_state: null };")
lines.append("JSON.stringify(_output);")
return "\n".join(lines) + "\n"
def parse_typescript_result(result_text: str) -> tuple[Any, None]:
"""
Parse the result from TypeScript tool execution.
TypeScript tools return JSON-serialized results instead of pickle.
Args:
result_text: The JSON string output from the TypeScript execution
Returns:
Tuple of (function_return_value, agent_state)
Note: agent_state is always None for TypeScript tools
"""
if not result_text:
return None, None
try:
result = json.loads(result_text)
return result.get("results"), result.get("agent_state")
except json.JSONDecodeError:
# If JSON parsing fails, return the raw text
return result_text, None

View File

@@ -0,0 +1,666 @@
"""Integration tests for TypeScript tool execution in E2B sandbox."""
import uuid
import pytest
from sqlalchemy import delete
from letta.config import LettaConfig
from letta.orm.sandbox_config import SandboxConfig, SandboxEnvironmentVariable
from letta.schemas.enums import ToolType
from letta.schemas.npm_requirement import NpmRequirement
from letta.schemas.organization import Organization
from letta.schemas.tool import Tool as PydanticTool, ToolCreate
from letta.schemas.user import User
from letta.server.server import SyncServer
from letta.services.organization_manager import OrganizationManager
from letta.services.tool_executor.tool_execution_sandbox import ToolExecutionSandbox
from letta.services.tool_manager import ToolManager
from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B
from letta.services.user_manager import UserManager
# Constants
namespace = uuid.NAMESPACE_DNS
org_name = str(uuid.uuid5(namespace, "test-typescript-tool-execution-org"))
user_name = str(uuid.uuid5(namespace, "test-typescript-tool-execution-user"))
# Fixtures
@pytest.fixture(scope="module")
def server():
"""Creates a SyncServer instance for testing."""
config = LettaConfig.load()
config.save()
server = SyncServer(init_with_default_org_and_user=True)
yield server
@pytest.fixture(autouse=True)
async def clear_tables():
"""Fixture to clear sandbox tables before each test."""
from letta.server.db import db_registry
async with db_registry.async_session() as session:
await session.execute(delete(SandboxEnvironmentVariable))
await session.execute(delete(SandboxConfig))
@pytest.fixture
async def test_organization():
"""Fixture to create and return the default organization."""
org = await OrganizationManager().create_organization_async(Organization(name=org_name))
yield org
@pytest.fixture
async def test_user(test_organization):
"""Fixture to create and return the default user within the default organization."""
user = await UserManager().create_actor_async(User(name=user_name, organization_id=test_organization.id))
yield user
# TypeScript Tool Fixtures
@pytest.fixture
async def add_numbers_ts_tool(test_user):
"""Simple TypeScript tool that adds two numbers."""
tool = PydanticTool(
name="add_numbers",
description="Add two numbers together",
source_code="""
export function add_numbers(x: number, y: number): number {
return x + y;
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "add_numbers",
"description": "Add two numbers together",
"parameters": {
"type": "object",
"properties": {
"x": {"type": "number", "description": "First number"},
"y": {"type": "number", "description": "Second number"},
},
"required": ["x", "y"],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
yield tool
@pytest.fixture
async def string_concat_ts_tool(test_user):
"""TypeScript tool that concatenates strings."""
tool = PydanticTool(
name="concat_strings",
description="Concatenate two strings",
source_code="""
export function concat_strings(a: string, b: string): string {
return a + b;
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "concat_strings",
"description": "Concatenate two strings",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "string", "description": "First string"},
"b": {"type": "string", "description": "Second string"},
},
"required": ["a", "b"],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
yield tool
@pytest.fixture
async def async_ts_tool(test_user):
"""Async TypeScript tool."""
tool = PydanticTool(
name="async_delay",
description="An async function that returns after a small delay",
source_code="""
export async function async_delay(message: string): Promise<string> {
await new Promise(resolve => setTimeout(resolve, 100));
return `Delayed: ${message}`;
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "async_delay",
"description": "An async function that returns after a small delay",
"parameters": {
"type": "object",
"properties": {
"message": {"type": "string", "description": "Message to return"},
},
"required": ["message"],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
yield tool
@pytest.fixture
async def error_ts_tool(test_user):
"""TypeScript tool that throws an error."""
tool = PydanticTool(
name="throw_error",
description="A function that always throws an error",
source_code="""
export function throw_error(): never {
throw new Error("This is an intentional test error");
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "throw_error",
"description": "A function that always throws an error",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
yield tool
@pytest.fixture
async def array_ts_tool(test_user):
"""TypeScript tool that works with arrays."""
tool = PydanticTool(
name="sum_array",
description="Sum all numbers in an array",
source_code="""
export function sum_array(numbers: number[]): number {
return numbers.reduce((acc, curr) => acc + curr, 0);
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "sum_array",
"description": "Sum all numbers in an array",
"parameters": {
"type": "object",
"properties": {
"numbers": {
"type": "array",
"items": {"type": "number"},
"description": "Array of numbers to sum",
},
},
"required": ["numbers"],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
yield tool
@pytest.fixture
async def object_ts_tool(test_user):
"""TypeScript tool that works with objects."""
tool = PydanticTool(
name="get_name",
description="Extract name from a person object",
source_code="""
export function get_name(person: { firstName: string; lastName: string }): string {
return `${person.firstName} ${person.lastName}`;
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "get_name",
"description": "Extract name from a person object",
"parameters": {
"type": "object",
"properties": {
"person": {
"type": "object",
"properties": {
"firstName": {"type": "string"},
"lastName": {"type": "string"},
},
"required": ["firstName", "lastName"],
"description": "Person object with name fields",
},
},
"required": ["person"],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
yield tool
# Tests
class TestTypescriptToolValidation:
"""Tests for TypeScript tool validation."""
def test_typescript_tool_requires_json_schema(self):
"""Test that TypeScript tools require explicit json_schema."""
with pytest.raises(ValueError, match="TypeScript tools require an explicit json_schema"):
ToolCreate(
source_code='export function test(): string { return "hello"; }',
source_type="typescript",
# Deliberately not providing json_schema
)
def test_typescript_tool_with_schema_is_valid(self, test_user):
"""Test that TypeScript tools with json_schema are valid."""
tool_create = ToolCreate(
source_code='export function test(): string { return "hello"; }',
source_type="typescript",
json_schema={
"name": "test",
"description": "Test function",
"parameters": {"type": "object", "properties": {}, "required": []},
},
)
assert tool_create.source_type == "typescript"
assert tool_create.json_schema is not None
def test_python_tool_without_schema_is_valid(self):
"""Test that Python tools can still be created without explicit json_schema."""
tool_create = ToolCreate(
source_code='def test(): return "hello"',
source_type="python",
# No json_schema - should be auto-generated for Python
)
assert tool_create.source_type == "python"
@pytest.mark.asyncio
async def test_typescript_tool_does_not_inject_agent_state(self, test_user):
"""Test that TypeScript tools do not support agent_state injection (legacy Python feature)."""
# Create a TypeScript tool that has 'agent_state' in its parameters
# (this shouldn't happen in practice, but we test the sandbox behavior)
tool = PydanticTool(
name="test_no_agent_state",
description="Test tool",
source_code="""
export function test_no_agent_state(x: number): number {
return x;
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "test_no_agent_state",
"description": "Test tool",
"parameters": {
"type": "object",
"properties": {
"x": {"type": "number"},
},
"required": ["x"],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
sandbox = AsyncToolSandboxE2B(
tool_name=tool.name,
args={"x": 42},
user=test_user,
tool_id=tool.id,
tool_object=tool,
)
# Initialize the sandbox to trigger the _init_async method
await sandbox._init_async()
# Verify agent_state injection is disabled for TypeScript tools
assert sandbox.inject_agent_state is False
@pytest.mark.e2b_sandbox
class TestTypescriptToolExecution:
"""Tests for TypeScript tool execution in E2B sandbox."""
@pytest.mark.asyncio
async def test_e2b_typescript_add_numbers(self, check_e2b_key_is_set, add_numbers_ts_tool, test_user):
"""Test basic TypeScript tool execution with number arguments."""
sandbox = AsyncToolSandboxE2B(
tool_name=add_numbers_ts_tool.name,
args={"x": 10, "y": 5},
user=test_user,
tool_id=add_numbers_ts_tool.id,
tool_object=add_numbers_ts_tool,
)
result = await sandbox.run()
assert result.status == "success"
assert result.func_return == 15
@pytest.mark.asyncio
async def test_e2b_typescript_string_concat(self, check_e2b_key_is_set, string_concat_ts_tool, test_user):
"""Test TypeScript tool execution with string arguments."""
sandbox = AsyncToolSandboxE2B(
tool_name=string_concat_ts_tool.name,
args={"a": "Hello, ", "b": "World!"},
user=test_user,
tool_id=string_concat_ts_tool.id,
tool_object=string_concat_ts_tool,
)
result = await sandbox.run()
assert result.status == "success"
assert result.func_return == "Hello, World!"
@pytest.mark.asyncio
async def test_e2b_typescript_async_function(self, check_e2b_key_is_set, async_ts_tool, test_user):
"""Test async TypeScript tool execution."""
sandbox = AsyncToolSandboxE2B(
tool_name=async_ts_tool.name,
args={"message": "test"},
user=test_user,
tool_id=async_ts_tool.id,
tool_object=async_ts_tool,
)
result = await sandbox.run()
assert result.status == "success"
assert result.func_return == "Delayed: test"
@pytest.mark.asyncio
async def test_e2b_typescript_error_handling(self, check_e2b_key_is_set, error_ts_tool, test_user):
"""Test TypeScript tool error handling."""
sandbox = AsyncToolSandboxE2B(
tool_name=error_ts_tool.name,
args={},
user=test_user,
tool_id=error_ts_tool.id,
tool_object=error_ts_tool,
)
result = await sandbox.run()
assert result.status == "error"
assert "error" in result.func_return.lower() or "Error" in str(result.stderr)
@pytest.mark.asyncio
async def test_e2b_typescript_array_argument(self, check_e2b_key_is_set, array_ts_tool, test_user):
"""Test TypeScript tool with array argument."""
sandbox = AsyncToolSandboxE2B(
tool_name=array_ts_tool.name,
args={"numbers": [1, 2, 3, 4, 5]},
user=test_user,
tool_id=array_ts_tool.id,
tool_object=array_ts_tool,
)
result = await sandbox.run()
assert result.status == "success"
assert result.func_return == 15
@pytest.mark.asyncio
async def test_e2b_typescript_object_argument(self, check_e2b_key_is_set, object_ts_tool, test_user):
"""Test TypeScript tool with object argument."""
sandbox = AsyncToolSandboxE2B(
tool_name=object_ts_tool.name,
args={"person": {"firstName": "John", "lastName": "Doe"}},
user=test_user,
tool_id=object_ts_tool.id,
tool_object=object_ts_tool,
)
result = await sandbox.run()
assert result.status == "success"
assert result.func_return == "John Doe"
@pytest.mark.e2b_sandbox
class TestTypescriptToolWithLettaClient:
"""Tests for TypeScript tools with Letta client integration."""
@pytest.mark.asyncio
async def test_e2b_typescript_letta_client_available(self, check_e2b_key_is_set, test_user):
"""Test that the Letta client is available in TypeScript sandbox (as null when no API key)."""
# Create a tool that checks if the client variable exists
tool = PydanticTool(
name="check_client",
description="Check if Letta client is available",
source_code="""
export function check_client(): string {
// client is injected by the sandbox - it will be null if no API key
return client === null ? "client is null (no API key)" : "client is available";
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "check_client",
"description": "Check if Letta client is available",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
sandbox = AsyncToolSandboxE2B(
tool_name=tool.name,
args={},
user=test_user,
tool_id=tool.id,
tool_object=tool,
)
result = await sandbox.run()
assert result.status == "success"
# Without LETTA_API_KEY, client should be null
assert "client" in result.func_return.lower()
@pytest.mark.e2b_sandbox
class TestTypescriptToolWithNpmPackages:
"""Tests for TypeScript tools with npm package dependencies."""
@pytest.mark.asyncio
async def test_e2b_typescript_with_npm_package(self, check_e2b_key_is_set, test_user):
"""Test TypeScript tool execution with npm package dependency."""
# Create a tool that uses the 'lodash' npm package
tool = PydanticTool(
name="lodash_capitalize",
description="Capitalize a string using lodash",
source_code="""
import _ from 'lodash';
export function lodash_capitalize(text: string): string {
return _.capitalize(text);
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
npm_requirements=[NpmRequirement(name="lodash"), NpmRequirement(name="@types/lodash")],
json_schema={
"name": "lodash_capitalize",
"description": "Capitalize a string using lodash",
"parameters": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to capitalize"},
},
"required": ["text"],
},
},
)
tool = await ToolManager().create_or_update_tool_async(tool, test_user)
sandbox = AsyncToolSandboxE2B(
tool_name=tool.name,
args={"text": "hello world"},
user=test_user,
tool_id=tool.id,
tool_object=tool,
)
result = await sandbox.run()
assert result.status == "success"
assert result.func_return == "Hello world"
class TestTypescriptGeneratorUnit:
"""Unit tests for TypeScript code generation."""
def test_convert_param_to_ts_value_string(self):
"""Test string parameter conversion."""
from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value
assert convert_param_to_ts_value("string", "hello") == '"hello"'
assert convert_param_to_ts_value("string", 'hello "world"') == '"hello \\"world\\""'
def test_convert_param_to_ts_value_number(self):
"""Test number parameter conversion."""
from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value
assert convert_param_to_ts_value("number", 42) == "42"
assert convert_param_to_ts_value("number", 3.14) == "3.14"
assert convert_param_to_ts_value("integer", 100) == "100"
def test_convert_param_to_ts_value_boolean(self):
"""Test boolean parameter conversion."""
from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value
assert convert_param_to_ts_value("boolean", True) == "true"
assert convert_param_to_ts_value("boolean", False) == "false"
def test_convert_param_to_ts_value_array(self):
"""Test array parameter conversion."""
from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value
assert convert_param_to_ts_value("array", [1, 2, 3]) == "[1, 2, 3]"
def test_convert_param_to_ts_value_object(self):
"""Test object parameter conversion."""
from letta.services.tool_sandbox.typescript_generator import convert_param_to_ts_value
result = convert_param_to_ts_value("object", {"key": "value"})
assert result == '{"key": "value"}'
def test_extract_typescript_function_name(self):
"""Test TypeScript function name extraction."""
from letta.services.tool_sandbox.typescript_generator import extract_typescript_function_name
assert extract_typescript_function_name("export function myFunc(): void {}") == "myFunc"
assert extract_typescript_function_name("export async function asyncFunc(): Promise<void> {}") == "asyncFunc"
assert extract_typescript_function_name("function notExported(): void {}") is None
def test_is_async_typescript_function(self):
"""Test async function detection."""
from letta.services.tool_sandbox.typescript_generator import is_async_typescript_function
assert is_async_typescript_function("export async function test(): Promise<void> {}", "test") is True
assert is_async_typescript_function("export function test(): void {}", "test") is False
def test_generate_typescript_execution_script(self):
"""Test TypeScript execution script generation."""
from letta.services.tool_sandbox.typescript_generator import generate_typescript_execution_script
script = generate_typescript_execution_script(
tool_name="add",
tool_source_code="export function add(x: number, y: number): number { return x + y; }",
args={"x": 1, "y": 2},
json_schema={
"name": "add",
"parameters": {
"type": "object",
"properties": {
"x": {"type": "number"},
"y": {"type": "number"},
},
},
},
)
# Verify Letta client initialization is included (using dynamic import with try/catch)
assert "let client: any = null;" in script
assert "await import('@letta-ai/letta-client')" in script
assert "process.env.LETTA_API_KEY" in script
assert "} catch (e) {" in script # Graceful fallback if package not available
# Verify arguments are initialized
assert "const x = 1;" in script
assert "const y = 2;" in script
assert "function add" in script # 'export' is stripped for inline execution
assert "const _result = add(x, y);" in script
assert "JSON.stringify(_output);" in script
# Verify agent_state is null (not supported for TypeScript)
assert "agent_state: null" in script
def test_parse_typescript_result(self):
"""Test TypeScript result parsing."""
from letta.services.tool_sandbox.typescript_generator import parse_typescript_result
# Valid JSON result
result, agent_state = parse_typescript_result('{"results": 42, "agent_state": null}')
assert result == 42
assert agent_state is None
# Invalid JSON returns raw text
result, agent_state = parse_typescript_result("not json")
assert result == "not json"
assert agent_state is None
# Empty result
result, agent_state = parse_typescript_result("")
assert result is None
assert agent_state is None
def test_sandbox_tool_executor_skips_ast_for_typescript(self):
"""Test that SandboxToolExecutor._prepare_function_args skips AST parsing for TypeScript."""
from letta.schemas.tool import Tool as PydanticTool
from letta.services.tool_executor.sandbox_tool_executor import SandboxToolExecutor
ts_tool = PydanticTool(
name="ts_func",
description="Test TypeScript tool",
source_code="""
export function ts_func(a: number, b: string): string {
return b.repeat(a);
}
""",
source_type="typescript",
tool_type=ToolType.CUSTOM,
json_schema={
"name": "ts_func",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "string"},
},
"required": ["a", "b"],
},
},
)
# This should NOT raise a SyntaxError - it should skip AST parsing for TypeScript
result = SandboxToolExecutor._prepare_function_args(
function_args={"a": 3, "b": "test"},
tool=ts_tool,
function_name="ts_func",
)
# Should return original args unchanged (no type coercion for TypeScript)
assert result == {"a": 3, "b": "test"}