diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 862b6f63..23a4104b 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -311,8 +311,8 @@ async def attach_source( actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) agent = await server.agent_manager.attach_source_async(agent_id=agent_id, source_id=source_id, actor=actor) if agent.enable_sleeptime: - source = server.source_manager.get_source_by_id(source_id=source_id) - background_tasks.add_task(server.sleeptime_document_ingest, agent, source, actor) + source = await server.source_manager.get_source_by_id(source_id=source_id) + background_tasks.add_task(server.sleeptime_document_ingest_async, agent, source, actor) return agent diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index aef65688..9dc42808 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -267,4 +267,4 @@ async def sleeptime_document_ingest_async(server: SyncServer, source_id: str, ac agents = await server.source_manager.list_attached_agents(source_id=source_id, actor=actor) for agent in agents: if agent.enable_sleeptime: - server.sleeptime_document_ingest(agent, source, actor, clear_history) # TODO: make async + await server.sleeptime_document_ingest_async(agent, source, actor, clear_history) diff --git a/letta/server/server.py b/letta/server/server.py index b74fcee0..af2a97e9 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -1325,28 +1325,40 @@ class SyncServer(Server): return job - def sleeptime_document_ingest(self, main_agent: AgentState, source: Source, actor: User, clear_history: bool = False) -> None: - sleeptime_agent = self.create_document_sleeptime_agent(main_agent, source, actor, clear_history) - agent = self.load_agent(agent_id=sleeptime_agent.id, actor=actor) - for passage in self.list_data_source_passages(source_id=source.id, user_id=actor.id): - agent.step( + async def sleeptime_document_ingest_async( + self, main_agent: AgentState, source: Source, actor: User, clear_history: bool = False + ) -> None: + sleeptime_agent = await self.create_document_sleeptime_agent_async(main_agent, source, actor, clear_history) + sleeptime_agent = LettaAgent( + agent_id=sleeptime_agent.id, + message_manager=self.message_manager, + agent_manager=self.agent_manager, + block_manager=self.block_manager, + passage_manager=self.passage_manager, + actor=actor, + step_manager=self.step_manager, + telemetry_manager=self.telemetry_manager if settings.llm_api_logging else NoopTelemetryManager(), + ) + passages = await self.agent_manager.list_passages_async(actor=actor, source_id=source.id) + for passage in passages: + await sleeptime_agent.step( input_messages=[ MessageCreate(role="user", content=passage.text), ] ) - self.agent_manager.delete_agent(agent_id=sleeptime_agent.id, actor=actor) + await self.agent_manager.delete_agent_async(agent_id=sleeptime_agent.id, actor=actor) - def create_document_sleeptime_agent( + async def create_document_sleeptime_agent_async( self, main_agent: AgentState, source: Source, actor: User, clear_history: bool = False ) -> AgentState: try: - block = self.agent_manager.get_block_with_label(agent_id=main_agent.id, block_label=source.name, actor=actor) + block = await self.agent_manager.get_block_with_label_async(agent_id=main_agent.id, block_label=source.name, actor=actor) except: - block = self.block_manager.create_or_update_block(Block(label=source.name, value=""), actor=actor) - self.agent_manager.attach_block(agent_id=main_agent.id, block_id=block.id, actor=actor) + block = await self.block_manager.create_or_update_block_async(Block(label=source.name, value=""), actor=actor) + await self.agent_manager.attach_block_async(agent_id=main_agent.id, block_id=block.id, actor=actor) if clear_history and block.value != "": - block = self.block_manager.update_block(block_id=block.id, block=BlockUpdate(value="")) + block = await self.block_manager.update_block_async(block_id=block.id, block=BlockUpdate(value="")) request = CreateAgent( name=main_agent.name + "-doc-sleeptime", @@ -1369,7 +1381,7 @@ class SyncServer(Server): include_base_tools=False, tools=constants.BASE_SLEEPTIME_TOOLS, ) - return self.agent_manager.create_agent( + return await self.agent_manager.create_agent_async( agent_create=request, actor=actor, )