feat(asyncify): migrate sleeptime agent creation (#2361)

This commit is contained in:
cthomas
2025-05-23 14:38:37 -07:00
committed by GitHub
parent cac0865a6f
commit f0afef79bc
3 changed files with 189 additions and 5 deletions

View File

@@ -852,9 +852,9 @@ class SyncServer(Server):
if request.enable_sleeptime:
if request.agent_type == AgentType.voice_convo_agent:
main_agent = self.create_voice_sleeptime_agent(main_agent=main_agent, actor=actor)
main_agent = await self.create_voice_sleeptime_agent_async(main_agent=main_agent, actor=actor)
else:
main_agent = self.create_sleeptime_agent(main_agent=main_agent, actor=actor)
main_agent = await self.create_sleeptime_agent_async(main_agent=main_agent, actor=actor)
return main_agent
@@ -897,12 +897,12 @@ class SyncServer(Server):
request.embedding_config = await self.get_embedding_config_from_handle_async(handle=request.embedding, actor=actor)
if request.enable_sleeptime:
agent = self.agent_manager.get_agent_by_id(agent_id=agent_id, actor=actor)
agent = await self.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor)
if agent.multi_agent_group is None:
if agent.agent_type == AgentType.voice_convo_agent:
self.create_voice_sleeptime_agent(main_agent=agent, actor=actor)
await self.create_voice_sleeptime_agent_async(main_agent=agent, actor=actor)
else:
self.create_sleeptime_agent(main_agent=agent, actor=actor)
await self.create_sleeptime_agent_async(main_agent=agent, actor=actor)
return await self.agent_manager.update_agent_async(
agent_id=agent_id,
@@ -942,6 +942,38 @@ class SyncServer(Server):
)
return self.agent_manager.get_agent_by_id(agent_id=main_agent.id, actor=actor)
async def create_sleeptime_agent_async(self, main_agent: AgentState, actor: User) -> AgentState:
request = CreateAgent(
name=main_agent.name + "-sleeptime",
agent_type=AgentType.sleeptime_agent,
block_ids=[block.id for block in main_agent.memory.blocks],
memory_blocks=[
CreateBlock(
label="memory_persona",
value=get_persona_text("sleeptime_memory_persona"),
),
],
llm_config=main_agent.llm_config,
embedding_config=main_agent.embedding_config,
project_id=main_agent.project_id,
)
sleeptime_agent = await self.agent_manager.create_agent_async(
agent_create=request,
actor=actor,
)
await self.group_manager.create_group_async(
group=GroupCreate(
description="",
agent_ids=[sleeptime_agent.id],
manager_config=SleeptimeManager(
manager_agent_id=main_agent.id,
sleeptime_agent_frequency=5,
),
),
actor=actor,
)
return await self.agent_manager.get_agent_by_id_async(agent_id=main_agent.id, actor=actor)
def create_voice_sleeptime_agent(self, main_agent: AgentState, actor: User) -> AgentState:
# TODO: Inject system
request = CreateAgent(
@@ -976,6 +1008,40 @@ class SyncServer(Server):
)
return self.agent_manager.get_agent_by_id(agent_id=main_agent.id, actor=actor)
async def create_voice_sleeptime_agent_async(self, main_agent: AgentState, actor: User) -> AgentState:
# TODO: Inject system
request = CreateAgent(
name=main_agent.name + "-sleeptime",
agent_type=AgentType.voice_sleeptime_agent,
block_ids=[block.id for block in main_agent.memory.blocks],
memory_blocks=[
CreateBlock(
label="memory_persona",
value=get_persona_text("voice_memory_persona"),
),
],
llm_config=LLMConfig.default_config("gpt-4.1"),
embedding_config=main_agent.embedding_config,
project_id=main_agent.project_id,
)
voice_sleeptime_agent = await self.agent_manager.create_agent_async(
agent_create=request,
actor=actor,
)
await self.group_manager.create_group_async(
group=GroupCreate(
description="Low latency voice chat with async memory management.",
agent_ids=[voice_sleeptime_agent.id],
manager_config=VoiceSleeptimeManager(
manager_agent_id=main_agent.id,
max_message_buffer_length=constants.DEFAULT_MAX_MESSAGE_BUFFER_LENGTH,
min_message_buffer_length=constants.DEFAULT_MIN_MESSAGE_BUFFER_LENGTH,
),
),
actor=actor,
)
return await self.agent_manager.get_agent_by_id_async(agent_id=main_agent.id, actor=actor)
# convert name->id
# TODO: These can be moved to agent_manager

View File

@@ -1,5 +1,6 @@
from typing import List, Optional
from sqlalchemy import select
from sqlalchemy.orm import Session
from letta.orm.agent import Agent as AgentModel
@@ -97,6 +98,51 @@ class GroupManager:
new_group.create(session, actor=actor)
return new_group.to_pydantic()
@enforce_types
async def create_group_async(self, group: GroupCreate, actor: PydanticUser) -> PydanticGroup:
async with db_registry.async_session() as session:
new_group = GroupModel()
new_group.organization_id = actor.organization_id
new_group.description = group.description
match group.manager_config.manager_type:
case ManagerType.round_robin:
new_group.manager_type = ManagerType.round_robin
new_group.max_turns = group.manager_config.max_turns
case ManagerType.dynamic:
new_group.manager_type = ManagerType.dynamic
new_group.manager_agent_id = group.manager_config.manager_agent_id
new_group.max_turns = group.manager_config.max_turns
new_group.termination_token = group.manager_config.termination_token
case ManagerType.supervisor:
new_group.manager_type = ManagerType.supervisor
new_group.manager_agent_id = group.manager_config.manager_agent_id
case ManagerType.sleeptime:
new_group.manager_type = ManagerType.sleeptime
new_group.manager_agent_id = group.manager_config.manager_agent_id
new_group.sleeptime_agent_frequency = group.manager_config.sleeptime_agent_frequency
if new_group.sleeptime_agent_frequency:
new_group.turns_counter = -1
case ManagerType.voice_sleeptime:
new_group.manager_type = ManagerType.voice_sleeptime
new_group.manager_agent_id = group.manager_config.manager_agent_id
max_message_buffer_length = group.manager_config.max_message_buffer_length
min_message_buffer_length = group.manager_config.min_message_buffer_length
# Safety check for buffer length range
self.ensure_buffer_length_range_valid(max_value=max_message_buffer_length, min_value=min_message_buffer_length)
new_group.max_message_buffer_length = max_message_buffer_length
new_group.min_message_buffer_length = min_message_buffer_length
case _:
raise ValueError(f"Unsupported manager type: {group.manager_config.manager_type}")
await self._process_agent_relationship_async(session=session, group=new_group, agent_ids=group.agent_ids, allow_partial=False)
if group.shared_block_ids:
await self._process_shared_block_relationship_async(session=session, group=new_group, block_ids=group.shared_block_ids)
await new_group.create_async(session, actor=actor)
return new_group.to_pydantic()
@trace_method
@enforce_types
def modify_group(self, group_id: str, group_update: GroupUpdate, actor: PydanticUser) -> PydanticGroup:
@@ -313,6 +359,38 @@ class GroupManager:
else:
raise ValueError("Extend relationship is not supported for groups.")
async def _process_agent_relationship_async(self, session, group: GroupModel, agent_ids: List[str], allow_partial=False, replace=True):
if not agent_ids:
if replace:
setattr(group, "agents", [])
setattr(group, "agent_ids", [])
return
if group.manager_type == ManagerType.dynamic and len(agent_ids) != len(set(agent_ids)):
raise ValueError("Duplicate agent ids found in list")
# Retrieve models for the provided IDs
query = select(AgentModel).where(AgentModel.id.in_(agent_ids))
result = await session.execute(query)
found_items = result.scalars().all()
# Validate all items are found if allow_partial is False
if not allow_partial and len(found_items) != len(agent_ids):
missing = set(agent_ids) - {item.id for item in found_items}
raise NoResultFound(f"Items not found in agents: {missing}")
if group.manager_type == ManagerType.dynamic:
names = [item.name for item in found_items]
if len(names) != len(set(names)):
raise ValueError("Duplicate agent names found in the provided agent IDs.")
if replace:
# Replace the relationship
setattr(group, "agents", found_items)
setattr(group, "agent_ids", agent_ids)
else:
raise ValueError("Extend relationship is not supported for groups.")
def _process_shared_block_relationship(
self,
session: Session,
@@ -340,6 +418,39 @@ class GroupManager:
for block in blocks:
session.add(BlocksAgents(agent_id=manager_agent.id, block_id=block.id, block_label=block.label))
async def _process_shared_block_relationship_async(
self,
session,
group: GroupModel,
block_ids: List[str],
):
"""Process shared block relationships for a group and its agents."""
from letta.orm import Agent, Block, BlocksAgents
# Add blocks to group
query = select(Block).where(Block.id.in_(block_ids))
result = await session.execute(query)
blocks = result.scalars().all()
group.shared_blocks = blocks
# Add blocks to all agents
if group.agent_ids:
query = select(Agent).where(Agent.id.in_(group.agent_ids))
result = await session.execute(query)
agents = result.scalars().all()
for agent in agents:
for block in blocks:
session.add(BlocksAgents(agent_id=agent.id, block_id=block.id, block_label=block.label))
# Add blocks to manager agent if exists
if group.manager_agent_id:
query = select(Agent).where(Agent.id == group.manager_agent_id)
result = await session.execute(query)
manager_agent = result.scalar_one_or_none()
if manager_agent:
for block in blocks:
session.add(BlocksAgents(agent_id=manager_agent.id, block_id=block.id, block_label=block.label))
@staticmethod
def ensure_buffer_length_range_valid(
max_value: Optional[int],

View File

@@ -38,6 +38,13 @@ class TelemetryManager:
def create_provider_trace(self, actor: PydanticUser, provider_trace_create: ProviderTraceCreate) -> PydanticProviderTrace:
with db_registry.session() as session:
provider_trace = ProviderTraceModel(**provider_trace_create.model_dump())
if provider_trace_create.request_json:
request_json_str = json_dumps(provider_trace_create.request_json)
provider_trace.request_json = json_loads(request_json_str)
if provider_trace_create.response_json:
response_json_str = json_dumps(provider_trace_create.response_json)
provider_trace.response_json = json_loads(response_json_str)
provider_trace.create(session, actor=actor)
return provider_trace.to_pydantic()