diff --git a/letta/helpers/crypto_utils.py b/letta/helpers/crypto_utils.py index 6cbdc5e7..682efb46 100644 --- a/letta/helpers/crypto_utils.py +++ b/letta/helpers/crypto_utils.py @@ -1,6 +1,7 @@ import asyncio import base64 import os +from concurrent.futures import ThreadPoolExecutor from functools import lru_cache from typing import Optional @@ -11,6 +12,10 @@ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from letta.settings import settings +# Dedicated thread pool for CPU-intensive crypto operations +# Prevents crypto from blocking health checks and other operations +_crypto_executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="CryptoWorker") + # Common API key prefixes that should not be considered encrypted # These are plaintext credentials that happen to be long strings PLAINTEXT_PREFIXES = ( @@ -69,13 +74,16 @@ class CryptoUtils: @classmethod async def _derive_key_async(cls, master_key: str, salt: bytes) -> bytes: """ - Async version of _derive_key that runs PBKDF2 in a thread pool. + Async version of _derive_key that runs PBKDF2 in a dedicated thread pool. - This prevents PBKDF2 (a CPU-intensive operation) from blocking the event loop. - PBKDF2 with 100k iterations typically takes 100-500ms, which would freeze - the event loop and prevent all other requests from being processed. + Uses a dedicated crypto thread pool (8 workers) to prevent PBKDF2 operations + from exhausting the default ThreadPoolExecutor (16 threads) and blocking + health checks and other operations during high load. + + PBKDF2 with 100k iterations typically takes 100-500ms per operation. """ - return await asyncio.to_thread(cls._derive_key, master_key, salt) + loop = asyncio.get_event_loop() + return await loop.run_in_executor(_crypto_executor, cls._derive_key, master_key, salt) @classmethod def encrypt(cls, plaintext: str, master_key: Optional[str] = None) -> str: diff --git a/letta/orm/agent.py b/letta/orm/agent.py index cea425ef..3b30ad8a 100644 --- a/letta/orm/agent.py +++ b/letta/orm/agent.py @@ -310,6 +310,7 @@ class Agent(SqlalchemyBase, OrganizationMixin, ProjectMixin, TemplateEntityMixin self, include_relationships: Optional[Set[str]] = None, include: Optional[List[str]] = None, + decrypt: bool = True, ) -> PydanticAgentState: """ Converts the SQLAlchemy Agent model into its Pydantic counterpart. @@ -441,8 +442,26 @@ class Agent(SqlalchemyBase, OrganizationMixin, ProjectMixin, TemplateEntityMixin state["identities"] = [i.to_pydantic() for i in identities] state["multi_agent_group"] = multi_agent_group state["managed_group"] = multi_agent_group - # Convert ORM env vars to Pydantic with async decryption - env_vars_pydantic = await bounded_gather([PydanticAgentEnvVar.from_orm_async(e) for e in tool_exec_environment_variables]) + # Convert ORM env vars to Pydantic, optionally skipping decryption + if decrypt: + env_vars_pydantic = await bounded_gather([PydanticAgentEnvVar.from_orm_async(e) for e in tool_exec_environment_variables]) + else: + # Skip decryption - return with encrypted values (faster, no PBKDF2) + from letta.schemas.environment_variables import AgentEnvironmentVariable + from letta.schemas.secret import Secret + + env_vars_pydantic = [] + for e in tool_exec_environment_variables: + data = { + "id": e.id, + "key": e.key, + "description": e.description, + "organization_id": e.organization_id, + "agent_id": e.agent_id, + "value": "", # Empty string, will be decrypted later + "value_enc": Secret.from_encrypted(e.value_enc) if e.value_enc else None, + } + env_vars_pydantic.append(AgentEnvironmentVariable.model_validate(data)) state["tool_exec_environment_variables"] = env_vars_pydantic state["secrets"] = env_vars_pydantic state["model"] = self.llm_config.handle if self.llm_config else None diff --git a/letta/services/agent_manager.py b/letta/services/agent_manager.py index 4df469e0..5ddc3b89 100644 --- a/letta/services/agent_manager.py +++ b/letta/services/agent_manager.py @@ -902,6 +902,34 @@ class AgentManager: # context manager now handles commits # await session.commit() + async def _decrypt_agent_secrets(self, agents: List[PydanticAgentState]) -> List[PydanticAgentState]: + """ + Decrypt secrets for all agents outside DB session. + + This allows DB connections to be released before expensive PBKDF2 operations, + preventing connection pool exhaustion during high load. + + Uses bounded concurrency to limit thread pool pressure while allowing some + parallelism in the dedicated crypto executor. + """ + + async def decrypt_env_var(env_var): + if env_var.value_enc and (env_var.value is None or env_var.value == ""): + env_var.value = await env_var.value_enc.get_plaintext_async() + + # Collect all env vars that need decryption + decrypt_tasks = [] + for agent in agents: + if agent.tool_exec_environment_variables: + for env_var in agent.tool_exec_environment_variables: + decrypt_tasks.append(decrypt_env_var(env_var)) + + # Decrypt with bounded concurrency (matches crypto executor size) + if decrypt_tasks: + await bounded_gather(decrypt_tasks, max_concurrency=8) + + return agents + @trace_method async def list_agents_async( self, @@ -970,10 +998,16 @@ class AgentManager: query = query.limit(limit) result = await session.execute(query) agents = result.scalars().all() - return await bounded_gather( - [agent.to_pydantic_async(include_relationships=include_relationships, include=include) for agent in agents] + + # Convert to pydantic without decrypting (keeps encrypted values) + # This allows us to release the DB connection before expensive PBKDF2 operations + agents_encrypted = await bounded_gather( + [agent.to_pydantic_async(include_relationships=include_relationships, include=include, decrypt=False) for agent in agents] ) + # DB session released - now decrypt secrets outside session to prevent connection holding + return await self._decrypt_agent_secrets(agents_encrypted) + @trace_method async def count_agents_async( self,