diff --git a/letta/server/server.py b/letta/server/server.py index 80001a02..3a03272d 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -852,9 +852,9 @@ class SyncServer(Server): if request.enable_sleeptime: if request.agent_type == AgentType.voice_convo_agent: - main_agent = self.create_voice_sleeptime_agent(main_agent=main_agent, actor=actor) + main_agent = await self.create_voice_sleeptime_agent_async(main_agent=main_agent, actor=actor) else: - main_agent = self.create_sleeptime_agent(main_agent=main_agent, actor=actor) + main_agent = await self.create_sleeptime_agent_async(main_agent=main_agent, actor=actor) return main_agent @@ -897,12 +897,12 @@ class SyncServer(Server): request.embedding_config = await self.get_embedding_config_from_handle_async(handle=request.embedding, actor=actor) if request.enable_sleeptime: - agent = self.agent_manager.get_agent_by_id(agent_id=agent_id, actor=actor) + agent = await self.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor) if agent.multi_agent_group is None: if agent.agent_type == AgentType.voice_convo_agent: - self.create_voice_sleeptime_agent(main_agent=agent, actor=actor) + await self.create_voice_sleeptime_agent_async(main_agent=agent, actor=actor) else: - self.create_sleeptime_agent(main_agent=agent, actor=actor) + await self.create_sleeptime_agent_async(main_agent=agent, actor=actor) return await self.agent_manager.update_agent_async( agent_id=agent_id, @@ -942,6 +942,38 @@ class SyncServer(Server): ) return self.agent_manager.get_agent_by_id(agent_id=main_agent.id, actor=actor) + async def create_sleeptime_agent_async(self, main_agent: AgentState, actor: User) -> AgentState: + request = CreateAgent( + name=main_agent.name + "-sleeptime", + agent_type=AgentType.sleeptime_agent, + block_ids=[block.id for block in main_agent.memory.blocks], + memory_blocks=[ + CreateBlock( + label="memory_persona", + value=get_persona_text("sleeptime_memory_persona"), + ), + ], + llm_config=main_agent.llm_config, + embedding_config=main_agent.embedding_config, + project_id=main_agent.project_id, + ) + sleeptime_agent = await self.agent_manager.create_agent_async( + agent_create=request, + actor=actor, + ) + await self.group_manager.create_group_async( + group=GroupCreate( + description="", + agent_ids=[sleeptime_agent.id], + manager_config=SleeptimeManager( + manager_agent_id=main_agent.id, + sleeptime_agent_frequency=5, + ), + ), + actor=actor, + ) + return await self.agent_manager.get_agent_by_id_async(agent_id=main_agent.id, actor=actor) + def create_voice_sleeptime_agent(self, main_agent: AgentState, actor: User) -> AgentState: # TODO: Inject system request = CreateAgent( @@ -976,6 +1008,40 @@ class SyncServer(Server): ) return self.agent_manager.get_agent_by_id(agent_id=main_agent.id, actor=actor) + async def create_voice_sleeptime_agent_async(self, main_agent: AgentState, actor: User) -> AgentState: + # TODO: Inject system + request = CreateAgent( + name=main_agent.name + "-sleeptime", + agent_type=AgentType.voice_sleeptime_agent, + block_ids=[block.id for block in main_agent.memory.blocks], + memory_blocks=[ + CreateBlock( + label="memory_persona", + value=get_persona_text("voice_memory_persona"), + ), + ], + llm_config=LLMConfig.default_config("gpt-4.1"), + embedding_config=main_agent.embedding_config, + project_id=main_agent.project_id, + ) + voice_sleeptime_agent = await self.agent_manager.create_agent_async( + agent_create=request, + actor=actor, + ) + await self.group_manager.create_group_async( + group=GroupCreate( + description="Low latency voice chat with async memory management.", + agent_ids=[voice_sleeptime_agent.id], + manager_config=VoiceSleeptimeManager( + manager_agent_id=main_agent.id, + max_message_buffer_length=constants.DEFAULT_MAX_MESSAGE_BUFFER_LENGTH, + min_message_buffer_length=constants.DEFAULT_MIN_MESSAGE_BUFFER_LENGTH, + ), + ), + actor=actor, + ) + return await self.agent_manager.get_agent_by_id_async(agent_id=main_agent.id, actor=actor) + # convert name->id # TODO: These can be moved to agent_manager diff --git a/letta/services/group_manager.py b/letta/services/group_manager.py index 4bce5825..a7895a52 100644 --- a/letta/services/group_manager.py +++ b/letta/services/group_manager.py @@ -1,5 +1,6 @@ from typing import List, Optional +from sqlalchemy import select from sqlalchemy.orm import Session from letta.orm.agent import Agent as AgentModel @@ -97,6 +98,51 @@ class GroupManager: new_group.create(session, actor=actor) return new_group.to_pydantic() + @enforce_types + async def create_group_async(self, group: GroupCreate, actor: PydanticUser) -> PydanticGroup: + async with db_registry.async_session() as session: + new_group = GroupModel() + new_group.organization_id = actor.organization_id + new_group.description = group.description + + match group.manager_config.manager_type: + case ManagerType.round_robin: + new_group.manager_type = ManagerType.round_robin + new_group.max_turns = group.manager_config.max_turns + case ManagerType.dynamic: + new_group.manager_type = ManagerType.dynamic + new_group.manager_agent_id = group.manager_config.manager_agent_id + new_group.max_turns = group.manager_config.max_turns + new_group.termination_token = group.manager_config.termination_token + case ManagerType.supervisor: + new_group.manager_type = ManagerType.supervisor + new_group.manager_agent_id = group.manager_config.manager_agent_id + case ManagerType.sleeptime: + new_group.manager_type = ManagerType.sleeptime + new_group.manager_agent_id = group.manager_config.manager_agent_id + new_group.sleeptime_agent_frequency = group.manager_config.sleeptime_agent_frequency + if new_group.sleeptime_agent_frequency: + new_group.turns_counter = -1 + case ManagerType.voice_sleeptime: + new_group.manager_type = ManagerType.voice_sleeptime + new_group.manager_agent_id = group.manager_config.manager_agent_id + max_message_buffer_length = group.manager_config.max_message_buffer_length + min_message_buffer_length = group.manager_config.min_message_buffer_length + # Safety check for buffer length range + self.ensure_buffer_length_range_valid(max_value=max_message_buffer_length, min_value=min_message_buffer_length) + new_group.max_message_buffer_length = max_message_buffer_length + new_group.min_message_buffer_length = min_message_buffer_length + case _: + raise ValueError(f"Unsupported manager type: {group.manager_config.manager_type}") + + await self._process_agent_relationship_async(session=session, group=new_group, agent_ids=group.agent_ids, allow_partial=False) + + if group.shared_block_ids: + await self._process_shared_block_relationship_async(session=session, group=new_group, block_ids=group.shared_block_ids) + + await new_group.create_async(session, actor=actor) + return new_group.to_pydantic() + @trace_method @enforce_types def modify_group(self, group_id: str, group_update: GroupUpdate, actor: PydanticUser) -> PydanticGroup: @@ -313,6 +359,38 @@ class GroupManager: else: raise ValueError("Extend relationship is not supported for groups.") + async def _process_agent_relationship_async(self, session, group: GroupModel, agent_ids: List[str], allow_partial=False, replace=True): + if not agent_ids: + if replace: + setattr(group, "agents", []) + setattr(group, "agent_ids", []) + return + + if group.manager_type == ManagerType.dynamic and len(agent_ids) != len(set(agent_ids)): + raise ValueError("Duplicate agent ids found in list") + + # Retrieve models for the provided IDs + query = select(AgentModel).where(AgentModel.id.in_(agent_ids)) + result = await session.execute(query) + found_items = result.scalars().all() + + # Validate all items are found if allow_partial is False + if not allow_partial and len(found_items) != len(agent_ids): + missing = set(agent_ids) - {item.id for item in found_items} + raise NoResultFound(f"Items not found in agents: {missing}") + + if group.manager_type == ManagerType.dynamic: + names = [item.name for item in found_items] + if len(names) != len(set(names)): + raise ValueError("Duplicate agent names found in the provided agent IDs.") + + if replace: + # Replace the relationship + setattr(group, "agents", found_items) + setattr(group, "agent_ids", agent_ids) + else: + raise ValueError("Extend relationship is not supported for groups.") + def _process_shared_block_relationship( self, session: Session, @@ -340,6 +418,39 @@ class GroupManager: for block in blocks: session.add(BlocksAgents(agent_id=manager_agent.id, block_id=block.id, block_label=block.label)) + async def _process_shared_block_relationship_async( + self, + session, + group: GroupModel, + block_ids: List[str], + ): + """Process shared block relationships for a group and its agents.""" + from letta.orm import Agent, Block, BlocksAgents + + # Add blocks to group + query = select(Block).where(Block.id.in_(block_ids)) + result = await session.execute(query) + blocks = result.scalars().all() + group.shared_blocks = blocks + + # Add blocks to all agents + if group.agent_ids: + query = select(Agent).where(Agent.id.in_(group.agent_ids)) + result = await session.execute(query) + agents = result.scalars().all() + for agent in agents: + for block in blocks: + session.add(BlocksAgents(agent_id=agent.id, block_id=block.id, block_label=block.label)) + + # Add blocks to manager agent if exists + if group.manager_agent_id: + query = select(Agent).where(Agent.id == group.manager_agent_id) + result = await session.execute(query) + manager_agent = result.scalar_one_or_none() + if manager_agent: + for block in blocks: + session.add(BlocksAgents(agent_id=manager_agent.id, block_id=block.id, block_label=block.label)) + @staticmethod def ensure_buffer_length_range_valid( max_value: Optional[int], diff --git a/letta/services/telemetry_manager.py b/letta/services/telemetry_manager.py index a57474b1..2dc14ca9 100644 --- a/letta/services/telemetry_manager.py +++ b/letta/services/telemetry_manager.py @@ -38,6 +38,13 @@ class TelemetryManager: def create_provider_trace(self, actor: PydanticUser, provider_trace_create: ProviderTraceCreate) -> PydanticProviderTrace: with db_registry.session() as session: provider_trace = ProviderTraceModel(**provider_trace_create.model_dump()) + if provider_trace_create.request_json: + request_json_str = json_dumps(provider_trace_create.request_json) + provider_trace.request_json = json_loads(request_json_str) + + if provider_trace_create.response_json: + response_json_str = json_dumps(provider_trace_create.response_json) + provider_trace.response_json = json_loads(response_json_str) provider_trace.create(session, actor=actor) return provider_trace.to_pydantic()