feat(asyncify): fully convert identities module (#2244)

This commit is contained in:
cthomas
2025-05-18 21:21:35 -07:00
committed by GitHub
parent cec4b89c27
commit dd90067fab
3 changed files with 81 additions and 75 deletions

View File

@@ -82,15 +82,15 @@ async def retrieve_identity(
@router.post("/", tags=["identities"], response_model=Identity, operation_id="create_identity")
def create_identity(
async def create_identity(
identity: IdentityCreate = Body(...),
server: "SyncServer" = Depends(get_letta_server),
actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present
x_project: Optional[str] = Header(None, alias="X-Project"), # Only handled by next js middleware
):
try:
actor = server.user_manager.get_user_or_default(user_id=actor_id)
return server.identity_manager.create_identity(identity=identity, actor=actor)
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
return await server.identity_manager.create_identity_async(identity=identity, actor=actor)
except HTTPException:
raise
except UniqueConstraintViolationError:
@@ -106,15 +106,15 @@ def create_identity(
@router.put("/", tags=["identities"], response_model=Identity, operation_id="upsert_identity")
def upsert_identity(
async def upsert_identity(
identity: IdentityUpsert = Body(...),
server: "SyncServer" = Depends(get_letta_server),
actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present
x_project: Optional[str] = Header(None, alias="X-Project"), # Only handled by next js middleware
):
try:
actor = server.user_manager.get_user_or_default(user_id=actor_id)
return server.identity_manager.upsert_identity(identity=identity, actor=actor)
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
return await server.identity_manager.upsert_identity_async(identity=identity, actor=actor)
except HTTPException:
raise
except NoResultFound as e:
@@ -124,36 +124,33 @@ def upsert_identity(
@router.patch("/{identity_id}", tags=["identities"], response_model=Identity, operation_id="update_identity")
def modify_identity(
async def modify_identity(
identity_id: str,
identity: IdentityUpdate = Body(...),
server: "SyncServer" = Depends(get_letta_server),
actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present
):
try:
actor = server.user_manager.get_user_or_default(user_id=actor_id)
return server.identity_manager.update_identity(identity_id=identity_id, identity=identity, actor=actor)
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
return await server.identity_manager.update_identity_async(identity_id=identity_id, identity=identity, actor=actor)
except HTTPException:
raise
except NoResultFound as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
import traceback
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"{e}")
@router.put("/{identity_id}/properties", tags=["identities"], operation_id="upsert_identity_properties")
def upsert_identity_properties(
async def upsert_identity_properties(
identity_id: str,
properties: List[IdentityProperty] = Body(...),
server: "SyncServer" = Depends(get_letta_server),
actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present
):
try:
actor = server.user_manager.get_user_or_default(user_id=actor_id)
return server.identity_manager.upsert_identity_properties(identity_id=identity_id, properties=properties, actor=actor)
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
return await server.identity_manager.upsert_identity_properties_async(identity_id=identity_id, properties=properties, actor=actor)
except HTTPException:
raise
except NoResultFound as e:
@@ -163,7 +160,7 @@ def upsert_identity_properties(
@router.delete("/{identity_id}", tags=["identities"], operation_id="delete_identity")
def delete_identity(
async def delete_identity(
identity_id: str,
server: "SyncServer" = Depends(get_letta_server),
actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present
@@ -172,8 +169,8 @@ def delete_identity(
Delete an identity by its identifier key
"""
try:
actor = server.user_manager.get_user_or_default(user_id=actor_id)
server.identity_manager.delete_identity(identity_id=identity_id, actor=actor)
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
await server.identity_manager.delete_identity_async(identity_id=identity_id, actor=actor)
except HTTPException:
raise
except NoResultFound as e:

View File

@@ -1,6 +1,7 @@
from typing import List, Optional
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Session
@@ -53,11 +54,11 @@ class IdentityManager:
return identity.to_pydantic()
@enforce_types
def create_identity(self, identity: IdentityCreate, actor: PydanticUser) -> PydanticIdentity:
with db_registry.session() as session:
async def create_identity_async(self, identity: IdentityCreate, actor: PydanticUser) -> PydanticIdentity:
async with db_registry.async_session() as session:
new_identity = IdentityModel(**identity.model_dump(exclude={"agent_ids", "block_ids"}, exclude_unset=True))
new_identity.organization_id = actor.organization_id
self._process_relationship(
await self._process_relationship_async(
session=session,
identity=new_identity,
relationship_name="agents",
@@ -65,7 +66,7 @@ class IdentityManager:
item_ids=identity.agent_ids,
allow_partial=False,
)
self._process_relationship(
await self._process_relationship_async(
session=session,
identity=new_identity,
relationship_name="blocks",
@@ -73,13 +74,13 @@ class IdentityManager:
item_ids=identity.block_ids,
allow_partial=False,
)
new_identity.create(session, actor=actor)
await new_identity.create_async(session, actor=actor)
return new_identity.to_pydantic()
@enforce_types
def upsert_identity(self, identity: IdentityUpsert, actor: PydanticUser) -> PydanticIdentity:
with db_registry.session() as session:
existing_identity = IdentityModel.read(
async def upsert_identity_async(self, identity: IdentityUpsert, actor: PydanticUser) -> PydanticIdentity:
async with db_registry.async_session() as session:
existing_identity = await IdentityModel.read_async(
db_session=session,
identifier_key=identity.identifier_key,
project_id=identity.project_id,
@@ -88,7 +89,7 @@ class IdentityManager:
)
if existing_identity is None:
return self.create_identity(identity=IdentityCreate(**identity.model_dump()), actor=actor)
return await self.create_identity_async(identity=IdentityCreate(**identity.model_dump()), actor=actor)
else:
identity_update = IdentityUpdate(
name=identity.name,
@@ -97,25 +98,27 @@ class IdentityManager:
agent_ids=identity.agent_ids,
properties=identity.properties,
)
return self._update_identity(
return await self._update_identity_async(
session=session, existing_identity=existing_identity, identity=identity_update, actor=actor, replace=True
)
@enforce_types
def update_identity(self, identity_id: str, identity: IdentityUpdate, actor: PydanticUser, replace: bool = False) -> PydanticIdentity:
with db_registry.session() as session:
async def update_identity_async(
self, identity_id: str, identity: IdentityUpdate, actor: PydanticUser, replace: bool = False
) -> PydanticIdentity:
async with db_registry.async_session() as session:
try:
existing_identity = IdentityModel.read(db_session=session, identifier=identity_id, actor=actor)
existing_identity = await IdentityModel.read_async(db_session=session, identifier=identity_id, actor=actor)
except NoResultFound:
raise HTTPException(status_code=404, detail="Identity not found")
if existing_identity.organization_id != actor.organization_id:
raise HTTPException(status_code=403, detail="Forbidden")
return self._update_identity(
return await self._update_identity_async(
session=session, existing_identity=existing_identity, identity=identity, actor=actor, replace=replace
)
def _update_identity(
async def _update_identity_async(
self,
session: Session,
existing_identity: IdentityModel,
@@ -139,7 +142,7 @@ class IdentityManager:
existing_identity.properties = list(new_properties.values())
if identity.agent_ids is not None:
self._process_relationship(
await self._process_relationship_async(
session=session,
identity=existing_identity,
relationship_name="agents",
@@ -149,7 +152,7 @@ class IdentityManager:
replace=replace,
)
if identity.block_ids is not None:
self._process_relationship(
await self._process_relationship_async(
session=session,
identity=existing_identity,
relationship_name="blocks",
@@ -158,16 +161,18 @@ class IdentityManager:
allow_partial=False,
replace=replace,
)
existing_identity.update(session, actor=actor)
await existing_identity.update_async(session, actor=actor)
return existing_identity.to_pydantic()
@enforce_types
def upsert_identity_properties(self, identity_id: str, properties: List[IdentityProperty], actor: PydanticUser) -> PydanticIdentity:
with db_registry.session() as session:
existing_identity = IdentityModel.read(db_session=session, identifier=identity_id, actor=actor)
async def upsert_identity_properties_async(
self, identity_id: str, properties: List[IdentityProperty], actor: PydanticUser
) -> PydanticIdentity:
async with db_registry.async_session() as session:
existing_identity = await IdentityModel.read_async(db_session=session, identifier=identity_id, actor=actor)
if existing_identity is None:
raise HTTPException(status_code=404, detail="Identity not found")
return self._update_identity(
return await self._update_identity_async(
session=session,
existing_identity=existing_identity,
identity=IdentityUpdate(properties=properties),
@@ -176,15 +181,15 @@ class IdentityManager:
)
@enforce_types
def delete_identity(self, identity_id: str, actor: PydanticUser) -> None:
with db_registry.session() as session:
identity = IdentityModel.read(db_session=session, identifier=identity_id)
async def delete_identity_async(self, identity_id: str, actor: PydanticUser) -> None:
async with db_registry.async_session() as session:
identity = await IdentityModel.read_async(db_session=session, identifier=identity_id, actor=actor)
if identity is None:
raise HTTPException(status_code=404, detail="Identity not found")
if identity.organization_id != actor.organization_id:
raise HTTPException(status_code=403, detail="Forbidden")
session.delete(identity)
session.commit()
await session.delete(identity)
await session.commit()
@enforce_types
async def size_async(
@@ -197,7 +202,7 @@ class IdentityManager:
async with db_registry.async_session() as session:
return await IdentityModel.size_async(db_session=session, actor=actor)
def _process_relationship(
async def _process_relationship_async(
self,
session: Session,
identity: PydanticIdentity,
@@ -214,7 +219,7 @@ class IdentityManager:
return
# Retrieve models for the provided IDs
found_items = session.query(model_class).filter(model_class.id.in_(item_ids)).all()
found_items = (await session.execute(select(model_class).where(model_class.id.in_(item_ids)))).scalars().all()
# Validate all items are found if allow_partial is False
if not allow_partial and len(found_items) != len(item_ids):

View File

@@ -3507,7 +3507,7 @@ async def test_create_and_upsert_identity(server: SyncServer, default_user, even
],
)
identity = server.identity_manager.create_identity(identity_create, actor=default_user)
identity = await server.identity_manager.create_identity_async(identity_create, actor=default_user)
# Assertions to ensure the created identity matches the expected values
assert identity.identifier_key == identity_create.identifier_key
@@ -3518,30 +3518,32 @@ async def test_create_and_upsert_identity(server: SyncServer, default_user, even
assert identity.project_id == None
with pytest.raises(UniqueConstraintViolationError):
server.identity_manager.create_identity(
await server.identity_manager.create_identity_async(
IdentityCreate(identifier_key="1234", name="sarah", identity_type=IdentityType.user),
actor=default_user,
)
identity_create.properties = [(IdentityProperty(key="age", value=29, type=IdentityPropertyType.number))]
identity = server.identity_manager.upsert_identity(identity=IdentityUpsert(**identity_create.model_dump()), actor=default_user)
identity = await server.identity_manager.upsert_identity_async(
identity=IdentityUpsert(**identity_create.model_dump()), actor=default_user
)
identity = await server.identity_manager.get_identity_async(identity_id=identity.id, actor=default_user)
assert len(identity.properties) == 1
assert identity.properties[0].key == "age"
assert identity.properties[0].value == 29
server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user)
@pytest.mark.asyncio
async def test_get_identities(server, default_user):
# Create identities to retrieve later
user = server.identity_manager.create_identity(
user = await server.identity_manager.create_identity_async(
IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user), actor=default_user
)
org = server.identity_manager.create_identity(
org = await server.identity_manager.create_identity_async(
IdentityCreate(name="letta", identifier_key="0001", identity_type=IdentityType.org), actor=default_user
)
@@ -3557,13 +3559,13 @@ async def test_get_identities(server, default_user):
assert len(org_identities) == 1
assert org_identities[0].name == org.name
server.identity_manager.delete_identity(identity_id=user.id, actor=default_user)
server.identity_manager.delete_identity(identity_id=org.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=user.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=org.id, actor=default_user)
@pytest.mark.asyncio
async def test_update_identity(server: SyncServer, sarah_agent, charles_agent, default_user, event_loop):
identity = server.identity_manager.create_identity(
identity = await server.identity_manager.create_identity_async(
IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user), actor=default_user
)
@@ -3572,7 +3574,7 @@ async def test_update_identity(server: SyncServer, sarah_agent, charles_agent, d
agent_ids=[sarah_agent.id, charles_agent.id],
properties=[IdentityProperty(key="email", value="caren@letta.com", type=IdentityPropertyType.string)],
)
server.identity_manager.update_identity(identity_id=identity.id, identity=update_data, actor=default_user)
await server.identity_manager.update_identity_async(identity_id=identity.id, identity=update_data, actor=default_user)
# Retrieve the updated identity
updated_identity = await server.identity_manager.get_identity_async(identity_id=identity.id, actor=default_user)
@@ -3586,16 +3588,16 @@ async def test_update_identity(server: SyncServer, sarah_agent, charles_agent, d
agent_state = await server.agent_manager.get_agent_by_id_async(agent_id=charles_agent.id, actor=default_user)
assert identity.id in agent_state.identity_ids
server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user)
@pytest.mark.asyncio
async def test_attach_detach_identity_from_agent(server: SyncServer, sarah_agent, default_user, event_loop):
# Create an identity
identity = server.identity_manager.create_identity(
identity = await server.identity_manager.create_identity_async(
IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user), actor=default_user
)
agent_state = server.agent_manager.update_agent(
agent_state = await server.agent_manager.update_agent_async(
agent_id=sarah_agent.id, agent_update=UpdateAgent(identity_ids=[identity.id]), actor=default_user
)
@@ -3603,7 +3605,7 @@ async def test_attach_detach_identity_from_agent(server: SyncServer, sarah_agent
assert identity.id in agent_state.identity_ids
# Now attempt to delete the identity
server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user)
# Verify that the identity was deleted
identities = await server.identity_manager.list_identities_async(actor=default_user)
@@ -3614,13 +3616,14 @@ async def test_attach_detach_identity_from_agent(server: SyncServer, sarah_agent
assert not identity.id in agent_state.identity_ids
def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_agent, default_user):
identity = server.identity_manager.create_identity(
@pytest.mark.asyncio
async def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_agent, default_user, event_loop):
identity = await server.identity_manager.create_identity_async(
IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user, agent_ids=[sarah_agent.id, charles_agent.id]),
actor=default_user,
)
agent_with_identity = server.create_agent(
agent_with_identity = await server.create_agent_async(
CreateAgent(
memory_blocks=[],
llm_config=LLMConfig.default_config("gpt-4o-mini"),
@@ -3641,7 +3644,7 @@ def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_
)
# Get the agents for identity id
agent_states = server.agent_manager.list_agents(identity_id=identity.id, actor=default_user)
agent_states = await server.agent_manager.list_agents_async(identity_id=identity.id, actor=default_user)
assert len(agent_states) == 3
# Check all agents are in the list
@@ -3652,7 +3655,7 @@ def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_
assert not agent_without_identity.id in agent_state_ids
# Get the agents for identifier key
agent_states = server.agent_manager.list_agents(identifier_keys=[identity.identifier_key], actor=default_user)
agent_states = await server.agent_manager.list_agents_async(identifier_keys=[identity.identifier_key], actor=default_user)
assert len(agent_states) == 3
# Check all agents are in the list
@@ -3675,13 +3678,13 @@ def test_get_set_agents_for_identities(server: SyncServer, sarah_agent, charles_
assert sarah_agent.id in agent_state_ids
assert charles_agent.id in agent_state_ids
server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user)
@pytest.mark.asyncio
async def test_attach_detach_identity_from_block(server: SyncServer, default_block, default_user, event_loop):
# Create an identity
identity = server.identity_manager.create_identity(
identity = await server.identity_manager.create_identity_async(
IdentityCreate(name="caren", identifier_key="1234", identity_type=IdentityType.user, block_ids=[default_block.id]),
actor=default_user,
)
@@ -3691,7 +3694,7 @@ async def test_attach_detach_identity_from_block(server: SyncServer, default_blo
assert len(blocks) == 1 and blocks[0].id == default_block.id
# Now attempt to delete the identity
server.identity_manager.delete_identity(identity_id=identity.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user)
# Verify that the identity was deleted
identities = await server.identity_manager.list_identities_async(actor=default_user)
@@ -3707,7 +3710,7 @@ async def test_get_set_blocks_for_identities(server: SyncServer, default_block,
block_manager = BlockManager()
block_with_identity = block_manager.create_or_update_block(PydanticBlock(label="persona", value="Original Content"), actor=default_user)
block_without_identity = block_manager.create_or_update_block(PydanticBlock(label="user", value="Original Content"), actor=default_user)
identity = server.identity_manager.create_identity(
identity = await server.identity_manager.create_identity_async(
IdentityCreate(
name="caren", identifier_key="1234", identity_type=IdentityType.user, block_ids=[default_block.id, block_with_identity.id]
),
@@ -3748,10 +3751,11 @@ async def test_get_set_blocks_for_identities(server: SyncServer, default_block,
assert not block_with_identity.id in block_ids
assert not block_without_identity.id in block_ids
server.identity_manager.delete_identity(identity.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user)
def test_upsert_properties(server: SyncServer, default_user):
@pytest.mark.asyncio
async def test_upsert_properties(server: SyncServer, default_user, event_loop):
identity_create = IdentityCreate(
identifier_key="1234",
name="caren",
@@ -3762,21 +3766,21 @@ def test_upsert_properties(server: SyncServer, default_user):
],
)
identity = server.identity_manager.create_identity(identity_create, actor=default_user)
identity = await server.identity_manager.create_identity_async(identity_create, actor=default_user)
properties = [
IdentityProperty(key="email", value="caren@gmail.com", type=IdentityPropertyType.string),
IdentityProperty(key="age", value="28", type=IdentityPropertyType.string),
IdentityProperty(key="test", value=123, type=IdentityPropertyType.number),
]
updated_identity = server.identity_manager.upsert_identity_properties(
updated_identity = await server.identity_manager.upsert_identity_properties_async(
identity_id=identity.id,
properties=properties,
actor=default_user,
)
assert updated_identity.properties == properties
server.identity_manager.delete_identity(identity.id, actor=default_user)
await server.identity_manager.delete_identity_async(identity_id=identity.id, actor=default_user)
# ======================================================================================================================