Files
letta-server/letta/templates/sandbox_code_file_async.py.j2

83 lines
2.7 KiB
Django/Jinja

{{ 'from __future__ import annotations' if future_import else '' }}
from typing import *
import pickle
import sys
import base64
import struct
import hashlib
import asyncio
{# Additional imports to support agent state #}
{% if inject_agent_state %}
import letta
from letta import *
{% endif %}
{# Add schema code if available #}
{{ schema_imports or '' }}
{# Load agent state #}
agent_state = {{ 'pickle.loads(' ~ agent_state_pickle ~ ')' if agent_state_pickle else 'None' }}
{{ tool_args }}
{# The tool's source code #}
{{ tool_source_code }}
{# Async wrapper to handle the function call and store the result #}
async def _async_wrapper():
_function_result = await {{ invoke_function_call }}
{# Use a temporary Pydantic wrapper to recursively serialize any nested Pydantic objects #}
try:
from pydantic import BaseModel
from typing import Any
class _TempResultWrapper(BaseModel):
result: Any
class Config:
arbitrary_types_allowed = True
_wrapped = _TempResultWrapper(result=_function_result)
_serialized_result = _wrapped.model_dump()['result']
except ImportError:
# Pydantic not available in sandbox, fall back to string conversion
print("Pydantic not available in sandbox environment, falling back to string conversion")
_serialized_result = str(_function_result)
except Exception as e:
# If wrapping fails, print the error and stringify the result
print(f"Failed to serialize result with Pydantic wrapper: {e}")
_serialized_result = str(_function_result)
return {
"results": _serialized_result,
"agent_state": agent_state
}
{# Run the async function - method depends on environment #}
{% if use_top_level_await %}
{# Environment with running event loop (like E2B) - use top-level await #}
{{ local_sandbox_result_var_name }} = await _async_wrapper()
{% else %}
{# Local execution environment - use asyncio.run #}
{{ local_sandbox_result_var_name }} = asyncio.run(_async_wrapper())
{% endif %}
{{ local_sandbox_result_var_name }}_pkl = pickle.dumps({{ local_sandbox_result_var_name }})
{% if wrap_print_with_markers %}
{# Combine everything to flush and write at once. #}
data_checksum = hashlib.md5({{ local_sandbox_result_var_name }}_pkl).hexdigest().encode('ascii')
{{ local_sandbox_result_var_name }}_msg = (
{{ start_marker }} +
struct.pack('>I', len({{ local_sandbox_result_var_name }}_pkl)) +
data_checksum +
{{ local_sandbox_result_var_name }}_pkl
)
sys.stdout.buffer.write({{ local_sandbox_result_var_name }}_msg)
sys.stdout.buffer.flush()
{% else %}
base64.b64encode({{ local_sandbox_result_var_name }}_pkl).decode('utf-8')
{% endif %}