feat(asyncify): migrate batch sbox methods (#2396)
This commit is contained in:
@@ -248,7 +248,7 @@ class LettaAgentBatch(BaseAgent):
|
||||
await self._mark_steps_complete_async(llm_batch_id, ctx.agent_ids)
|
||||
|
||||
log_event(name="prepare_next")
|
||||
next_reqs, next_step_state = self._prepare_next_iteration(exec_results, ctx, msg_map)
|
||||
next_reqs, next_step_state = await self._prepare_next_iteration_async(exec_results, ctx, msg_map)
|
||||
if len(next_reqs) == 0:
|
||||
await self.job_manager.update_job_by_id_async(
|
||||
job_id=letta_batch_id, job_update=JobUpdate(status=JobStatus.completed), actor=self.actor
|
||||
@@ -389,15 +389,15 @@ class LettaAgentBatch(BaseAgent):
|
||||
if updates:
|
||||
await self.batch_manager.bulk_update_llm_batch_items_request_status_by_agent_async(updates=updates)
|
||||
|
||||
def _build_sandbox(self) -> Tuple[SandboxConfig, Dict[str, Any]]:
|
||||
async def _build_sandbox(self) -> Tuple[SandboxConfig, Dict[str, Any]]:
|
||||
sbx_type = SandboxType.E2B if tool_settings.e2b_api_key else SandboxType.LOCAL
|
||||
cfg = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=sbx_type, actor=self.actor)
|
||||
env = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(cfg.id, actor=self.actor, limit=100)
|
||||
cfg = await self.sandbox_config_manager.get_or_create_default_sandbox_config_async(sandbox_type=sbx_type, actor=self.actor)
|
||||
env = await self.sandbox_config_manager.get_sandbox_env_vars_as_dict_async(cfg.id, actor=self.actor, limit=100)
|
||||
return cfg, env
|
||||
|
||||
@trace_method
|
||||
async def _execute_tools(self, ctx: _ResumeContext) -> Sequence[tuple[str, ToolExecutionResult]]:
|
||||
sbx_cfg, sbx_env = self._build_sandbox()
|
||||
async def _execute_tools(self, ctx: _ResumeContext) -> Sequence[Tuple[str, Tuple[str, bool]]]:
|
||||
sbx_cfg, sbx_env = await self._build_sandbox()
|
||||
rethink_memory_tool_name = "rethink_memory"
|
||||
tool_params = []
|
||||
# TODO: This is a special case - we need to think about how to generalize this
|
||||
@@ -484,7 +484,7 @@ class LettaAgentBatch(BaseAgent):
|
||||
]
|
||||
await self.batch_manager.bulk_update_llm_batch_items_step_status_by_agent_async(updates)
|
||||
|
||||
def _prepare_next_iteration(
|
||||
async def _prepare_next_iteration_async(
|
||||
self,
|
||||
exec_results: Sequence[Tuple[str, "ToolExecutionResult"]],
|
||||
ctx: _ResumeContext,
|
||||
@@ -513,7 +513,7 @@ class LettaAgentBatch(BaseAgent):
|
||||
for aid, new_msgs in msg_map.items():
|
||||
ast = ctx.agent_state_map[aid]
|
||||
if not ast.message_buffer_autoclear:
|
||||
self.agent_manager.set_in_context_messages(
|
||||
await self.agent_manager.set_in_context_messages_async(
|
||||
agent_id=aid,
|
||||
message_ids=ast.message_ids + [m.id for m in new_msgs],
|
||||
actor=self.actor,
|
||||
|
||||
@@ -638,7 +638,7 @@ async def send_message(
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
|
||||
# TODO: This is redundant, remove soon
|
||||
agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor)
|
||||
agent_eligible = agent.enable_sleeptime or not agent.multi_agent_group
|
||||
agent_eligible = (agent.enable_sleeptime or not agent.multi_agent_group) or agent.agent_type == AgentType.sleeptime_agent
|
||||
experimental_header = request_obj.headers.get("X-EXPERIMENTAL") or "false"
|
||||
feature_enabled = settings.use_experimental or experimental_header.lower() == "true"
|
||||
model_compatible = agent.llm_config.model_endpoint_type in ["anthropic", "openai", "together", "google_ai", "google_vertex"]
|
||||
|
||||
@@ -1 +1 @@
|
||||
{}
|
||||
{}
|
||||
|
||||
Reference in New Issue
Block a user