From ac69cd0bb8cee0ea27ecab03cd4f00c0ba591999 Mon Sep 17 00:00:00 2001 From: cthomas Date: Tue, 27 May 2025 12:41:44 -0700 Subject: [PATCH] feat(asyncify): migrate delete passage (#2458) --- letta/server/rest_api/routers/v1/agents.py | 6 ++-- letta/server/server.py | 11 ++++++ letta/services/passage_manager.py | 39 ++++++++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index c29903dc..63c51a10 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -549,7 +549,7 @@ def modify_passage( # TODO(ethan): query or path parameter for memory_id? # @router.delete("/{agent_id}/archival") @router.delete("/{agent_id}/archival-memory/{memory_id}", response_model=None, operation_id="delete_passage") -def delete_passage( +async def delete_passage( agent_id: str, memory_id: str, # memory_id: str = Query(..., description="Unique ID of the memory to be deleted."), @@ -559,9 +559,9 @@ def delete_passage( """ Delete a memory from an agent's archival memory store. """ - actor = server.user_manager.get_user_or_default(user_id=actor_id) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) - server.delete_archival_memory(memory_id=memory_id, actor=actor) + await server.delete_archival_memory_async(memory_id=memory_id, actor=actor) return JSONResponse(status_code=status.HTTP_200_OK, content={"message": f"Memory id={memory_id} successfully deleted"}) diff --git a/letta/server/server.py b/letta/server/server.py index af2a97e9..2042da70 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -1158,6 +1158,17 @@ class SyncServer(Server): # rebuild system prompt and force self.agent_manager.rebuild_system_prompt(agent_id=passage.agent_id, actor=actor, force=True) + async def delete_archival_memory_async(self, memory_id: str, actor: User): + # TODO check if it exists first, and throw error if not + # TODO: need to also rebuild the prompt here + passage = await self.passage_manager.get_passage_by_id_async(passage_id=memory_id, actor=actor) + + # delete the passage + await self.passage_manager.delete_passage_by_id_async(passage_id=memory_id, actor=actor) + + # rebuild system prompt and force + await self.agent_manager.rebuild_system_prompt_async(agent_id=passage.agent_id, actor=actor, force=True) + def get_agent_recall( self, user_id: str, diff --git a/letta/services/passage_manager.py b/letta/services/passage_manager.py index f8139955..8f057bde 100644 --- a/letta/services/passage_manager.py +++ b/letta/services/passage_manager.py @@ -58,6 +58,23 @@ class PassageManager: except NoResultFound: raise NoResultFound(f"Passage with id {passage_id} not found in database.") + @enforce_types + @trace_method + async def get_passage_by_id_async(self, passage_id: str, actor: PydanticUser) -> Optional[PydanticPassage]: + """Fetch a passage by ID.""" + async with db_registry.async_session() as session: + # Try source passages first + try: + passage = await SourcePassage.read_async(db_session=session, identifier=passage_id, actor=actor) + return passage.to_pydantic() + except NoResultFound: + # Try archival passages + try: + passage = await AgentPassage.read_async(db_session=session, identifier=passage_id, actor=actor) + return passage.to_pydantic() + except NoResultFound: + raise NoResultFound(f"Passage with id {passage_id} not found in database.") + @enforce_types @trace_method def create_passage(self, pydantic_passage: PydanticPassage, actor: PydanticUser) -> PydanticPassage: @@ -331,6 +348,28 @@ class PassageManager: except NoResultFound: raise NoResultFound(f"Passage with id {passage_id} not found.") + @enforce_types + @trace_method + async def delete_passage_by_id_async(self, passage_id: str, actor: PydanticUser) -> bool: + """Delete a passage from either source or archival passages.""" + if not passage_id: + raise ValueError("Passage ID must be provided.") + + async with db_registry.async_session() as session: + # Try source passages first + try: + passage = await SourcePassage.read_async(db_session=session, identifier=passage_id, actor=actor) + await passage.hard_delete(session, actor=actor) + return True + except NoResultFound: + # Try archival passages + try: + passage = await AgentPassage.read_async(db_session=session, identifier=passage_id, actor=actor) + await passage.hard_delete_async(session, actor=actor) + return True + except NoResultFound: + raise NoResultFound(f"Passage with id {passage_id} not found.") + @enforce_types @trace_method def delete_passages(