From dd90067fab2c7e9b1cab4f2a0a27a4e0f08c2f7d Mon Sep 17 00:00:00 2001 From: cthomas Date: Sun, 18 May 2025 21:21:35 -0700 Subject: [PATCH] feat(asyncify): fully convert identities module (#2244) --- .../server/rest_api/routers/v1/identities.py | 33 +++++----- letta/services/identity_manager.py | 63 ++++++++++--------- tests/test_managers.py | 60 +++++++++--------- 3 files changed, 81 insertions(+), 75 deletions(-) diff --git a/letta/server/rest_api/routers/v1/identities.py b/letta/server/rest_api/routers/v1/identities.py index d563aab4..16cdbb26 100644 --- a/letta/server/rest_api/routers/v1/identities.py +++ b/letta/server/rest_api/routers/v1/identities.py @@ -82,15 +82,15 @@ async def retrieve_identity( @router.post("/", tags=["identities"], response_model=Identity, operation_id="create_identity") -def create_identity( +async def create_identity( identity: IdentityCreate = Body(...), server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present x_project: Optional[str] = Header(None, alias="X-Project"), # Only handled by next js middleware ): try: - actor = server.user_manager.get_user_or_default(user_id=actor_id) - return server.identity_manager.create_identity(identity=identity, actor=actor) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) + return await server.identity_manager.create_identity_async(identity=identity, actor=actor) except HTTPException: raise except UniqueConstraintViolationError: @@ -106,15 +106,15 @@ def create_identity( @router.put("/", tags=["identities"], response_model=Identity, operation_id="upsert_identity") -def upsert_identity( +async def upsert_identity( identity: IdentityUpsert = Body(...), server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present x_project: Optional[str] = Header(None, alias="X-Project"), # Only handled by next js middleware ): try: - actor = server.user_manager.get_user_or_default(user_id=actor_id) - return server.identity_manager.upsert_identity(identity=identity, actor=actor) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) + return await server.identity_manager.upsert_identity_async(identity=identity, actor=actor) except HTTPException: raise except NoResultFound as e: @@ -124,36 +124,33 @@ def upsert_identity( @router.patch("/{identity_id}", tags=["identities"], response_model=Identity, operation_id="update_identity") -def modify_identity( +async def modify_identity( identity_id: str, identity: IdentityUpdate = Body(...), server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present ): try: - actor = server.user_manager.get_user_or_default(user_id=actor_id) - return server.identity_manager.update_identity(identity_id=identity_id, identity=identity, actor=actor) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) + return await server.identity_manager.update_identity_async(identity_id=identity_id, identity=identity, actor=actor) except HTTPException: raise except NoResultFound as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: - import traceback - - print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"{e}") @router.put("/{identity_id}/properties", tags=["identities"], operation_id="upsert_identity_properties") -def upsert_identity_properties( +async def upsert_identity_properties( identity_id: str, properties: List[IdentityProperty] = Body(...), server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present ): try: - actor = server.user_manager.get_user_or_default(user_id=actor_id) - return server.identity_manager.upsert_identity_properties(identity_id=identity_id, properties=properties, actor=actor) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) + return await server.identity_manager.upsert_identity_properties_async(identity_id=identity_id, properties=properties, actor=actor) except HTTPException: raise except NoResultFound as e: @@ -163,7 +160,7 @@ def upsert_identity_properties( @router.delete("/{identity_id}", tags=["identities"], operation_id="delete_identity") -def delete_identity( +async def delete_identity( identity_id: str, server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present @@ -172,8 +169,8 @@ def delete_identity( Delete an identity by its identifier key """ try: - actor = server.user_manager.get_user_or_default(user_id=actor_id) - server.identity_manager.delete_identity(identity_id=identity_id, actor=actor) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) + await server.identity_manager.delete_identity_async(identity_id=identity_id, actor=actor) except HTTPException: raise except NoResultFound as e: diff --git a/letta/services/identity_manager.py b/letta/services/identity_manager.py index 3531dc5c..590cedee 100644 --- a/letta/services/identity_manager.py +++ b/letta/services/identity_manager.py @@ -1,6 +1,7 @@ from typing import List, Optional from fastapi import HTTPException +from sqlalchemy import select from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import Session @@ -53,11 +54,11 @@ class IdentityManager: return identity.to_pydantic() @enforce_types - def create_identity(self, identity: IdentityCreate, actor: PydanticUser) -> PydanticIdentity: - with db_registry.session() as session: + async def create_identity_async(self, identity: IdentityCreate, actor: PydanticUser) -> PydanticIdentity: + async with db_registry.async_session() as session: new_identity = IdentityModel(**identity.model_dump(exclude={"agent_ids", "block_ids"}, exclude_unset=True)) new_identity.organization_id = actor.organization_id - self._process_relationship( + await self._process_relationship_async( session=session, identity=new_identity, relationship_name="agents", @@ -65,7 +66,7 @@ class IdentityManager: item_ids=identity.agent_ids, allow_partial=False, ) - self._process_relationship( + await self._process_relationship_async( session=session, identity=new_identity, relationship_name="blocks", @@ -73,13 +74,13 @@ class IdentityManager: item_ids=identity.block_ids, allow_partial=False, ) - new_identity.create(session, actor=actor) + await new_identity.create_async(session, actor=actor) return new_identity.to_pydantic() @enforce_types - def upsert_identity(self, identity: IdentityUpsert, actor: PydanticUser) -> PydanticIdentity: - with db_registry.session() as session: - existing_identity = IdentityModel.read( + async def upsert_identity_async(self, identity: IdentityUpsert, actor: PydanticUser) -> PydanticIdentity: + async with db_registry.async_session() as session: + existing_identity = await IdentityModel.read_async( db_session=session, identifier_key=identity.identifier_key, project_id=identity.project_id, @@ -88,7 +89,7 @@ class IdentityManager: ) if existing_identity is None: - return self.create_identity(identity=IdentityCreate(**identity.model_dump()), actor=actor) + return await self.create_identity_async(identity=IdentityCreate(**identity.model_dump()), actor=actor) else: identity_update = IdentityUpdate( name=identity.name, @@ -97,25 +98,27 @@ class IdentityManager: agent_ids=identity.agent_ids, properties=identity.properties, ) - return self._update_identity( + return await self._update_identity_async( session=session, existing_identity=existing_identity, identity=identity_update, actor=actor, replace=True ) @enforce_types - def update_identity(self, identity_id: str, identity: IdentityUpdate, actor: PydanticUser, replace: bool = False) -> PydanticIdentity: - with db_registry.session() as session: + async def update_identity_async( + self, identity_id: str, identity: IdentityUpdate, actor: PydanticUser, replace: bool = False + ) -> PydanticIdentity: + async with db_registry.async_session() as session: try: - existing_identity = IdentityModel.read(db_session=session, identifier=identity_id, actor=actor) + existing_identity = await IdentityModel.read_async(db_session=session, identifier=identity_id, actor=actor) except NoResultFound: raise HTTPException(status_code=404, detail="Identity not found") if existing_identity.organization_id != actor.organization_id: raise HTTPException(status_code=403, detail="Forbidden") - return self._update_identity( + return await self._update_identity_async( session=session, existing_identity=existing_identity, identity=identity, actor=actor, replace=replace ) - def _update_identity( + async def _update_identity_async( self, session: Session, existing_identity: IdentityModel, @@ -139,7 +142,7 @@ class IdentityManager: existing_identity.properties = list(new_properties.values()) if identity.agent_ids is not None: - self._process_relationship( + await self._process_relationship_async( session=session, identity=existing_identity, relationship_name="agents", @@ -149,7 +152,7 @@ class IdentityManager: replace=replace, ) if identity.block_ids is not None: - self._process_relationship( + await self._process_relationship_async( session=session, identity=existing_identity, relationship_name="blocks", @@ -158,16 +161,18 @@ class IdentityManager: allow_partial=False, replace=replace, ) - existing_identity.update(session, actor=actor) + await existing_identity.update_async(session, actor=actor) return existing_identity.to_pydantic() @enforce_types - def upsert_identity_properties(self, identity_id: str, properties: List[IdentityProperty], actor: PydanticUser) -> PydanticIdentity: - with db_registry.session() as session: - existing_identity = IdentityModel.read(db_session=session, identifier=identity_id, actor=actor) + async def upsert_identity_properties_async( + self, identity_id: str, properties: List[IdentityProperty], actor: PydanticUser + ) -> PydanticIdentity: + async with db_registry.async_session() as session: + existing_identity = await IdentityModel.read_async(db_session=session, identifier=identity_id, actor=actor) if existing_identity is None: raise HTTPException(status_code=404, detail="Identity not found") - return self._update_identity( + return await self._update_identity_async( session=session, existing_identity=existing_identity, identity=IdentityUpdate(properties=properties), @@ -176,15 +181,15 @@ class IdentityManager: ) @enforce_types - def delete_identity(self, identity_id: str, actor: PydanticUser) -> None: - with db_registry.session() as session: - identity = IdentityModel.read(db_session=session, identifier=identity_id) + async def delete_identity_async(self, identity_id: str, actor: PydanticUser) -> None: + async with db_registry.async_session() as session: + identity = await IdentityModel.read_async(db_session=session, identifier=identity_id, actor=actor) if identity is None: raise HTTPException(status_code=404, detail="Identity not found") if identity.organization_id != actor.organization_id: raise HTTPException(status_code=403, detail="Forbidden") - session.delete(identity) - session.commit() + await session.delete(identity) + await session.commit() @enforce_types async def size_async( @@ -197,7 +202,7 @@ class IdentityManager: async with db_registry.async_session() as session: return await IdentityModel.size_async(db_session=session, actor=actor) - def _process_relationship( + async def _process_relationship_async( self, session: Session, identity: PydanticIdentity, @@ -214,7 +219,7 @@ class IdentityManager: return # Retrieve models for the provided IDs - found_items = session.query(model_class).filter(model_class.id.in_(item_ids)).all() + found_items = (await session.execute(select(model_class).where(model_class.id.in_(item_ids)))).scalars().all() # Validate all items are found if allow_partial is False if not allow_partial and len(found_items) != len(item_ids): diff --git a/tests/test_managers.py b/tests/test_managers.py index 05e77017..2549fb42 100644 --- a/tests/test_managers.py +++ b/tests/test_managers.py @@ -3507,7 +3507,7 @@ async def test_create_and_upsert_identity(server: SyncServer, default_user, even ], ) - identity = server.identity_manager.create_identity(identity_create, actor=default_user) + identity = await server.identity_manager.create_identity_async(identity_create, actor=default_user) # Assertions to ensure the created identity matches the expected values assert identity.identifier_key == identity_create.identifier_key @@ -3518,30 +3518,32 @@ async def test_create_and_upsert_identity(server: SyncServer, default_user, even assert identity.project_id == None with pytest.raises(UniqueConstraintViolationError): - server.identity_manager.create_identity( + await server.identity_manager.create_identity_async( IdentityCreate(identifier_key="1234", name="sarah", identity_type=IdentityType.user), actor=default_user, ) identity_create.properties = [(IdentityProperty(key="age", value=29, type=IdentityPropertyType.number))] - identity = server.identity_manager.upsert_identity(identity=IdentityUpsert(**identity_create.model_dump()), actor=default_user) + identity = await server.identity_manager.upsert_identity_async( + identity=IdentityUpsert(**identity_create.model_dump()), actor=default_user + ) identity = await server.identity_manager.get_identity_async(identity_id=identity.id, actor=default_user) assert len(identity.properties) == 1 assert identity.properties[0].key == "age" assert identity.properties[0].value == 29 - server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user) @pytest.mark.asyncio async def test_get_identities(server, default_user): # Create identities to retrieve later - user = server.identity_manager.create_identity( + user = await server.identity_manager.create_identity_async( IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user), actor=default_user ) - org = server.identity_manager.create_identity( + org = await server.identity_manager.create_identity_async( IdentityCreate(name="letta", identifier_key="0001", identity_type=IdentityType.org), actor=default_user ) @@ -3557,13 +3559,13 @@ async def test_get_identities(server, default_user): assert len(org_identities) == 1 assert org_identities[0].name == org.name - server.identity_manager.delete_identity(identity_id=user.id, actor=default_user) - server.identity_manager.delete_identity(identity_id=org.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=user.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=org.id, actor=default_user) @pytest.mark.asyncio async def test_update_identity(server: SyncServer, sarah_agent, charles_agent, default_user, event_loop): - identity = server.identity_manager.create_identity( + identity = await server.identity_manager.create_identity_async( IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user), actor=default_user ) @@ -3572,7 +3574,7 @@ async def test_update_identity(server: SyncServer, sarah_agent, charles_agent, d agent_ids=[sarah_agent.id, charles_agent.id], properties=[IdentityProperty(key="email", value="caren@letta.com", type=IdentityPropertyType.string)], ) - server.identity_manager.update_identity(identity_id=identity.id, identity=update_data, actor=default_user) + await server.identity_manager.update_identity_async(identity_id=identity.id, identity=update_data, actor=default_user) # Retrieve the updated identity updated_identity = await server.identity_manager.get_identity_async(identity_id=identity.id, actor=default_user) @@ -3586,16 +3588,16 @@ async def test_update_identity(server: SyncServer, sarah_agent, charles_agent, d agent_state = await server.agent_manager.get_agent_by_id_async(agent_id=charles_agent.id, actor=default_user) assert identity.id in agent_state.identity_ids - server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user) @pytest.mark.asyncio async def test_attach_detach_identity_from_agent(server: SyncServer, sarah_agent, default_user, event_loop): # Create an identity - identity = server.identity_manager.create_identity( + identity = await server.identity_manager.create_identity_async( IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user), actor=default_user ) - agent_state = server.agent_manager.update_agent( + agent_state = await server.agent_manager.update_agent_async( agent_id=sarah_agent.id, agent_update=UpdateAgent(identity_ids=[identity.id]), actor=default_user ) @@ -3603,7 +3605,7 @@ async def test_attach_detach_identity_from_agent(server: SyncServer, sarah_agent assert identity.id in agent_state.identity_ids # Now attempt to delete the identity - server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user) # Verify that the identity was deleted identities = await server.identity_manager.list_identities_async(actor=default_user) @@ -3614,13 +3616,14 @@ async def test_attach_detach_identity_from_agent(server: SyncServer, sarah_agent assert not identity.id in agent_state.identity_ids -def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_agent, default_user): - identity = server.identity_manager.create_identity( +@pytest.mark.asyncio +async def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_agent, default_user, event_loop): + identity = await server.identity_manager.create_identity_async( IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user, agent_ids=[sarah_agent.id, charles_agent.id]), actor=default_user, ) - agent_with_identity = server.create_agent( + agent_with_identity = await server.create_agent_async( CreateAgent( memory_blocks=[], llm_config=LLMConfig.default_config("gpt-4o-mini"), @@ -3641,7 +3644,7 @@ def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_ ) # Get the agents for identity id - agent_states = server.agent_manager.list_agents(identity_id=identity.id, actor=default_user) + agent_states = await server.agent_manager.list_agents_async(identity_id=identity.id, actor=default_user) assert len(agent_states) == 3 # Check all agents are in the list @@ -3652,7 +3655,7 @@ def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_ assert not agent_without_identity.id in agent_state_ids # Get the agents for identifier key - agent_states = server.agent_manager.list_agents(identifier_keys=[identity.identifier_key], actor=default_user) + agent_states = await server.agent_manager.list_agents_async(identifier_keys=[identity.identifier_key], actor=default_user) assert len(agent_states) == 3 # Check all agents are in the list @@ -3675,13 +3678,13 @@ def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_ assert sarah_agent.id in agent_state_ids assert charles_agent.id in agent_state_ids - server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user) @pytest.mark.asyncio async def test_attach_detach_identity_from_block(server: SyncServer, default_block, default_user, event_loop): # Create an identity - identity = server.identity_manager.create_identity( + identity = await server.identity_manager.create_identity_async( IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user, block_ids=[default_block.id]), actor=default_user, ) @@ -3691,7 +3694,7 @@ async def test_attach_detach_identity_from_block(server: SyncServer, default_blo assert len(blocks) == 1 and blocks[0].id == default_block.id # Now attempt to delete the identity - server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user) # Verify that the identity was deleted identities = await server.identity_manager.list_identities_async(actor=default_user) @@ -3707,7 +3710,7 @@ async def test_get_set_blocks_for_identities(server: SyncServer, default_block, block_manager = BlockManager() block_with_identity = block_manager.create_or_update_block(PydanticBlock(label="persona", value="Original Content"), actor=default_user) block_without_identity = block_manager.create_or_update_block(PydanticBlock(label="user", value="Original Content"), actor=default_user) - identity = server.identity_manager.create_identity( + identity = await server.identity_manager.create_identity_async( IdentityCreate( name="caren", identifier_key="1234", identity_type=IdentityType.user, block_ids=[default_block.id, block_with_identity.id] ), @@ -3748,10 +3751,11 @@ async def test_get_set_blocks_for_identities(server: SyncServer, default_block, assert not block_with_identity.id in block_ids assert not block_without_identity.id in block_ids - server.identity_manager.delete_identity(identity.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user) -def test_upsert_properties(server: SyncServer, default_user): +@pytest.mark.asyncio +async def test_upsert_properties(server: SyncServer, default_user, event_loop): identity_create = IdentityCreate( identifier_key="1234", name="caren", @@ -3762,21 +3766,21 @@ def test_upsert_properties(server: SyncServer, default_user): ], ) - identity = server.identity_manager.create_identity(identity_create, actor=default_user) + identity = await server.identity_manager.create_identity_async(identity_create, actor=default_user) properties = [ IdentityProperty(key="email", value="caren@gmail.com", type=IdentityPropertyType.string), IdentityProperty(key="age", value="28", type=IdentityPropertyType.string), IdentityProperty(key="test", value=123, type=IdentityPropertyType.number), ] - updated_identity = server.identity_manager.upsert_identity_properties( + updated_identity = await server.identity_manager.upsert_identity_properties_async( identity_id=identity.id, properties=properties, actor=default_user, ) assert updated_identity.properties == properties - server.identity_manager.delete_identity(identity.id, actor=default_user) + await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user) # ======================================================================================================================