feat: move decryption outside db session (#8323)
* feat: move decryption outside db session * fix pydantic error
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from functools import lru_cache
|
||||
from typing import Optional
|
||||
|
||||
@@ -11,6 +12,10 @@ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
|
||||
from letta.settings import settings
|
||||
|
||||
# Dedicated thread pool for CPU-intensive crypto operations
|
||||
# Prevents crypto from blocking health checks and other operations
|
||||
_crypto_executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="CryptoWorker")
|
||||
|
||||
# Common API key prefixes that should not be considered encrypted
|
||||
# These are plaintext credentials that happen to be long strings
|
||||
PLAINTEXT_PREFIXES = (
|
||||
@@ -69,13 +74,16 @@ class CryptoUtils:
|
||||
@classmethod
|
||||
async def _derive_key_async(cls, master_key: str, salt: bytes) -> bytes:
|
||||
"""
|
||||
Async version of _derive_key that runs PBKDF2 in a thread pool.
|
||||
Async version of _derive_key that runs PBKDF2 in a dedicated thread pool.
|
||||
|
||||
This prevents PBKDF2 (a CPU-intensive operation) from blocking the event loop.
|
||||
PBKDF2 with 100k iterations typically takes 100-500ms, which would freeze
|
||||
the event loop and prevent all other requests from being processed.
|
||||
Uses a dedicated crypto thread pool (8 workers) to prevent PBKDF2 operations
|
||||
from exhausting the default ThreadPoolExecutor (16 threads) and blocking
|
||||
health checks and other operations during high load.
|
||||
|
||||
PBKDF2 with 100k iterations typically takes 100-500ms per operation.
|
||||
"""
|
||||
return await asyncio.to_thread(cls._derive_key, master_key, salt)
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(_crypto_executor, cls._derive_key, master_key, salt)
|
||||
|
||||
@classmethod
|
||||
def encrypt(cls, plaintext: str, master_key: Optional[str] = None) -> str:
|
||||
|
||||
@@ -310,6 +310,7 @@ class Agent(SqlalchemyBase, OrganizationMixin, ProjectMixin, TemplateEntityMixin
|
||||
self,
|
||||
include_relationships: Optional[Set[str]] = None,
|
||||
include: Optional[List[str]] = None,
|
||||
decrypt: bool = True,
|
||||
) -> PydanticAgentState:
|
||||
"""
|
||||
Converts the SQLAlchemy Agent model into its Pydantic counterpart.
|
||||
@@ -441,8 +442,26 @@ class Agent(SqlalchemyBase, OrganizationMixin, ProjectMixin, TemplateEntityMixin
|
||||
state["identities"] = [i.to_pydantic() for i in identities]
|
||||
state["multi_agent_group"] = multi_agent_group
|
||||
state["managed_group"] = multi_agent_group
|
||||
# Convert ORM env vars to Pydantic with async decryption
|
||||
env_vars_pydantic = await bounded_gather([PydanticAgentEnvVar.from_orm_async(e) for e in tool_exec_environment_variables])
|
||||
# Convert ORM env vars to Pydantic, optionally skipping decryption
|
||||
if decrypt:
|
||||
env_vars_pydantic = await bounded_gather([PydanticAgentEnvVar.from_orm_async(e) for e in tool_exec_environment_variables])
|
||||
else:
|
||||
# Skip decryption - return with encrypted values (faster, no PBKDF2)
|
||||
from letta.schemas.environment_variables import AgentEnvironmentVariable
|
||||
from letta.schemas.secret import Secret
|
||||
|
||||
env_vars_pydantic = []
|
||||
for e in tool_exec_environment_variables:
|
||||
data = {
|
||||
"id": e.id,
|
||||
"key": e.key,
|
||||
"description": e.description,
|
||||
"organization_id": e.organization_id,
|
||||
"agent_id": e.agent_id,
|
||||
"value": "", # Empty string, will be decrypted later
|
||||
"value_enc": Secret.from_encrypted(e.value_enc) if e.value_enc else None,
|
||||
}
|
||||
env_vars_pydantic.append(AgentEnvironmentVariable.model_validate(data))
|
||||
state["tool_exec_environment_variables"] = env_vars_pydantic
|
||||
state["secrets"] = env_vars_pydantic
|
||||
state["model"] = self.llm_config.handle if self.llm_config else None
|
||||
|
||||
@@ -902,6 +902,34 @@ class AgentManager:
|
||||
# context manager now handles commits
|
||||
# await session.commit()
|
||||
|
||||
async def _decrypt_agent_secrets(self, agents: List[PydanticAgentState]) -> List[PydanticAgentState]:
|
||||
"""
|
||||
Decrypt secrets for all agents outside DB session.
|
||||
|
||||
This allows DB connections to be released before expensive PBKDF2 operations,
|
||||
preventing connection pool exhaustion during high load.
|
||||
|
||||
Uses bounded concurrency to limit thread pool pressure while allowing some
|
||||
parallelism in the dedicated crypto executor.
|
||||
"""
|
||||
|
||||
async def decrypt_env_var(env_var):
|
||||
if env_var.value_enc and (env_var.value is None or env_var.value == ""):
|
||||
env_var.value = await env_var.value_enc.get_plaintext_async()
|
||||
|
||||
# Collect all env vars that need decryption
|
||||
decrypt_tasks = []
|
||||
for agent in agents:
|
||||
if agent.tool_exec_environment_variables:
|
||||
for env_var in agent.tool_exec_environment_variables:
|
||||
decrypt_tasks.append(decrypt_env_var(env_var))
|
||||
|
||||
# Decrypt with bounded concurrency (matches crypto executor size)
|
||||
if decrypt_tasks:
|
||||
await bounded_gather(decrypt_tasks, max_concurrency=8)
|
||||
|
||||
return agents
|
||||
|
||||
@trace_method
|
||||
async def list_agents_async(
|
||||
self,
|
||||
@@ -970,10 +998,16 @@ class AgentManager:
|
||||
query = query.limit(limit)
|
||||
result = await session.execute(query)
|
||||
agents = result.scalars().all()
|
||||
return await bounded_gather(
|
||||
[agent.to_pydantic_async(include_relationships=include_relationships, include=include) for agent in agents]
|
||||
|
||||
# Convert to pydantic without decrypting (keeps encrypted values)
|
||||
# This allows us to release the DB connection before expensive PBKDF2 operations
|
||||
agents_encrypted = await bounded_gather(
|
||||
[agent.to_pydantic_async(include_relationships=include_relationships, include=include, decrypt=False) for agent in agents]
|
||||
)
|
||||
|
||||
# DB session released - now decrypt secrets outside session to prevent connection holding
|
||||
return await self._decrypt_agent_secrets(agents_encrypted)
|
||||
|
||||
@trace_method
|
||||
async def count_agents_async(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user