feat: read from encrypted values with error on fallback [LET-4250] (#6484)

* base

* warning

---------

Co-authored-by: Letta Bot <noreply@letta.com>
This commit is contained in:
jnjpng
2025-12-03 12:36:31 -08:00
committed by Caren Thomas
parent 647e271c2a
commit 89ca8fe8b0
9 changed files with 227 additions and 176 deletions

View File

@@ -21,12 +21,12 @@ class EnvironmentVariableBase(OrmMetadataBase):
value_enc: Secret | None = Field(None, description="Encrypted value as Secret object")
def get_value_secret(self) -> Secret:
"""Get the value as a Secret object, preferring encrypted over plaintext."""
"""Get the value as a Secret object. Prefers encrypted, falls back to plaintext with error logging."""
# If value_enc is already a Secret, return it
if self.value_enc is not None:
return self.value_enc
# Otherwise, create from plaintext value
return Secret.from_db(None, self.value)
# Fallback to plaintext with error logging via Secret.from_db()
return Secret.from_db(encrypted_value=None, plaintext_value=self.value)
def set_value_secret(self, secret: Secret) -> None:
"""Set value from a Secret object, directly storing the Secret."""

View File

@@ -49,20 +49,20 @@ class MCPServer(BaseMCPServer):
metadata_: Optional[Dict[str, Any]] = Field(default_factory=dict, description="A dictionary of additional metadata for the tool.")
def get_token_secret(self) -> Secret:
"""Get the token as a Secret object, preferring encrypted over plaintext."""
"""Get the token as a Secret object. Prefers encrypted, falls back to plaintext with error logging."""
if self.token_enc is not None:
return self.token_enc
return Secret.from_db(None, self.token)
# Fallback to plaintext with error logging via Secret.from_db()
return Secret.from_db(encrypted_value=None, plaintext_value=self.token)
def get_custom_headers_secret(self) -> Secret:
"""Get custom headers as a Secret object (stores JSON string), preferring encrypted over plaintext."""
"""Get custom headers as a Secret object (stores JSON string). Prefers encrypted, falls back to plaintext with error logging."""
if self.custom_headers_enc is not None:
return self.custom_headers_enc
# Fallback: convert plaintext dict to JSON string and wrap in Secret
if self.custom_headers is not None:
json_str = json.dumps(self.custom_headers)
return Secret.from_plaintext(json_str)
return Secret.from_plaintext(None)
# Fallback to plaintext with error logging via Secret.from_db()
# Convert dict to JSON string for Secret storage
plaintext_json = json.dumps(self.custom_headers) if self.custom_headers else None
return Secret.from_db(encrypted_value=None, plaintext_value=plaintext_json)
def get_custom_headers_dict(self) -> Optional[Dict[str, str]]:
"""Get custom headers as a plaintext dictionary."""

View File

@@ -165,28 +165,32 @@ class MCPOAuthSession(BaseMCPOAuth):
updated_at: datetime = Field(default_factory=datetime.now, description="Last update time")
def get_access_token_secret(self) -> Secret:
"""Get the access token as a Secret object, preferring encrypted over plaintext."""
"""Get the access token as a Secret object. Prefers encrypted, falls back to plaintext with error logging."""
if self.access_token_enc is not None:
return self.access_token_enc
return Secret.from_db(None, self.access_token)
# Fallback to plaintext with error logging via Secret.from_db()
return Secret.from_db(encrypted_value=None, plaintext_value=self.access_token)
def get_refresh_token_secret(self) -> Secret:
"""Get the refresh token as a Secret object, preferring encrypted over plaintext."""
"""Get the refresh token as a Secret object. Prefers encrypted, falls back to plaintext with error logging."""
if self.refresh_token_enc is not None:
return self.refresh_token_enc
return Secret.from_db(None, self.refresh_token)
# Fallback to plaintext with error logging via Secret.from_db()
return Secret.from_db(encrypted_value=None, plaintext_value=self.refresh_token)
def get_client_secret_secret(self) -> Secret:
"""Get the client secret as a Secret object, preferring encrypted over plaintext."""
"""Get the client secret as a Secret object. Prefers encrypted, falls back to plaintext with error logging."""
if self.client_secret_enc is not None:
return self.client_secret_enc
return Secret.from_db(None, self.client_secret)
# Fallback to plaintext with error logging via Secret.from_db()
return Secret.from_db(encrypted_value=None, plaintext_value=self.client_secret)
def get_authorization_code_secret(self) -> Secret:
"""Get the authorization code as a Secret object, preferring encrypted over plaintext."""
"""Get the authorization code as a Secret object. Prefers encrypted, falls back to plaintext with error logging."""
if self.authorization_code_enc is not None:
return self.authorization_code_enc
return Secret.from_db(None, self.authorization_code)
# Fallback to plaintext with error logging via Secret.from_db()
return Secret.from_db(encrypted_value=None, plaintext_value=self.authorization_code)
def set_access_token_secret(self, secret: Secret) -> None:
"""Set access token from a Secret object."""

View File

@@ -50,20 +50,20 @@ class Provider(ProviderBase):
self.id = ProviderBase.generate_id(prefix=ProviderBase.__id_prefix__)
def get_api_key_secret(self) -> Secret:
"""Get the API key as a Secret object, preferring encrypted over plaintext."""
"""Get the API key as a Secret object. Prefers encrypted, falls back to plaintext with error logging."""
# If api_key_enc is already a Secret, return it
if self.api_key_enc is not None:
return self.api_key_enc
# Otherwise, create from plaintext api_key
return Secret.from_db(None, self.api_key)
# Fallback to plaintext with error logging via Secret.from_db()
return Secret.from_db(encrypted_value=None, plaintext_value=self.api_key)
def get_access_key_secret(self) -> Secret:
"""Get the access key as a Secret object, preferring encrypted over plaintext."""
"""Get the access key as a Secret object. Prefers encrypted, falls back to plaintext with error logging."""
# If access_key_enc is already a Secret, return it
if self.access_key_enc is not None:
return self.access_key_enc
# Otherwise, create from plaintext access_key
return Secret.from_db(None, self.access_key)
# Fallback to plaintext with error logging via Secret.from_db()
return Secret.from_db(encrypted_value=None, plaintext_value=self.access_key)
def set_api_key_secret(self, secret: Secret) -> None:
"""Set API key from a Secret object, directly storing the Secret."""

View File

@@ -17,11 +17,14 @@ class Secret(BaseModel):
This class ensures that sensitive data remains encrypted as much as possible
while passing through the codebase, only decrypting when absolutely necessary.
TODO: Once we deprecate plaintext columns in the database:
- Remove the dual-write logic in to_dict()
- Remove the from_db() method's plaintext_value parameter
- Remove the was_encrypted flag (no longer needed for migration)
- Simplify get_plaintext() to only handle encrypted values
Migration status (Phase 1 - encrypted-first reads with plaintext fallback):
- Reads: Prefer _enc columns, fallback to plaintext columns with ERROR logging
- Writes: Still dual-write to both _enc and plaintext columns for backward compatibility
- Encryption: Optional - if LETTA_ENCRYPTION_KEY is not set, stores plaintext in _enc column
TODO (Phase 2): Remove plaintext fallback in from_db() after verifying no error logs
TODO (Phase 3): Remove dual-write logic in to_dict() and set_*_secret() methods
TODO (Phase 4): Remove from_db() plaintext_value parameter, was_encrypted flag, and plaintext columns
"""
# Store the encrypted value as a regular field
@@ -38,11 +41,14 @@ class Secret(BaseModel):
"""
Create a Secret from a plaintext value, encrypting it if possible.
If LETTA_ENCRYPTION_KEY is configured, the value is encrypted.
If not, the plaintext value is stored directly in encrypted_value field.
Args:
value: The plaintext value to encrypt
Returns:
A Secret instance with the encrypted value, or plaintext if encryption unavailable
A Secret instance with the encrypted (or plaintext) value
"""
if value is None:
return cls.model_construct(encrypted_value=None, was_encrypted=False)
@@ -51,19 +57,19 @@ class Secret(BaseModel):
if CryptoUtils.is_encrypted(value):
logger.warning("Creating Secret from already-encrypted value. This can be dangerous.")
# Try to encrypt, but fall back to plaintext if no encryption key
# Try to encrypt, but fall back to storing plaintext if no encryption key
try:
encrypted = CryptoUtils.encrypt(value)
return cls.model_construct(encrypted_value=encrypted, was_encrypted=False)
except ValueError as e:
# No encryption key available, store as plaintext
# No encryption key available, store as plaintext in the _enc column
if "No encryption key configured" in str(e):
logger.warning(
"No encryption key configured. Storing Secret value as plaintext. "
"No encryption key configured. Storing Secret value as plaintext in _enc column. "
"Set LETTA_ENCRYPTION_KEY environment variable to enable encryption."
)
instance = cls.model_construct(encrypted_value=value, was_encrypted=False)
instance._plaintext_cache = value # Cache it
instance._plaintext_cache = value # Cache it since we know the plaintext
return instance
raise # Re-raise if it's a different error
@@ -81,25 +87,35 @@ class Secret(BaseModel):
return cls.model_construct(encrypted_value=encrypted_value, was_encrypted=True)
@classmethod
def from_db(cls, encrypted_value: Optional[str], plaintext_value: Optional[str]) -> "Secret":
def from_db(cls, encrypted_value: Optional[str], plaintext_value: Optional[str] = None) -> "Secret":
"""
Create a Secret from database values during migration phase.
Create a Secret from database values. Prefers encrypted column, falls back to plaintext with error logging.
Prefers encrypted value if available, falls back to plaintext.
During Phase 1 of migration, this method:
1. Uses encrypted_value if available (preferred)
2. Falls back to plaintext_value with ERROR logging if encrypted is unavailable
3. Returns empty Secret if neither is available
The error logging helps identify any records that haven't been migrated to encrypted columns.
Args:
encrypted_value: The encrypted value from the database
plaintext_value: The plaintext value from the database
encrypted_value: The encrypted value from the database (_enc column)
plaintext_value: The plaintext value from the database (legacy column, fallback only)
Returns:
A Secret instance
A Secret instance with the value from encrypted or plaintext column
"""
if encrypted_value is not None:
return cls.from_encrypted(encrypted_value)
elif plaintext_value is not None:
# Fallback to plaintext with error logging - this helps identify unmigrated data
if plaintext_value is not None:
logger.error(
"MIGRATION_NEEDED: Reading from plaintext column instead of encrypted column. "
"This indicates data that hasn't been migrated to the _enc column yet. "
"Please run the backfill migration to populate _enc columns."
)
return cls.from_plaintext(plaintext_value)
else:
return cls.from_plaintext(None)
return cls.from_plaintext(None)
def get_encrypted(self) -> Optional[str]:
"""
@@ -117,23 +133,20 @@ class Secret(BaseModel):
This should only be called when the plaintext is actually needed,
such as when making an external API call.
If the value is encrypted, it will be decrypted. If the value is stored
as plaintext (no encryption key was configured), it will be returned as-is.
Returns:
The decrypted plaintext value
The decrypted plaintext value, or None if the secret is empty
"""
if self.encrypted_value is None:
return None
# Use cached value if available, but only if it looks like plaintext
# or we're confident we can decrypt it
# Use cached value if available
if self._plaintext_cache is not None:
# If we have a cache but the stored value looks encrypted and we have no key,
# we should not use the cache
if CryptoUtils.is_encrypted(self.encrypted_value) and not CryptoUtils.is_encryption_available():
self._plaintext_cache = None # Clear invalid cache
else:
return self._plaintext_cache
return self._plaintext_cache
# Decrypt and cache
# Try to decrypt
try:
plaintext = CryptoUtils.decrypt(self.encrypted_value)
# Cache the decrypted value (PrivateAttr fields can be mutated even with frozen=True)
@@ -142,16 +155,14 @@ class Secret(BaseModel):
except ValueError as e:
error_msg = str(e)
# Handle missing encryption key
# Handle missing encryption key - check if value is actually plaintext
if "No encryption key configured" in error_msg:
# Check if the value looks encrypted
if CryptoUtils.is_encrypted(self.encrypted_value):
# Value was encrypted, but now we have no key - can't decrypt
# Value was encrypted but we have no key - can't decrypt
logger.warning(
"Cannot decrypt Secret value - no encryption key configured. "
"The value was encrypted and requires the original key to decrypt."
)
# Return None to indicate we can't get the plaintext
return None
else:
# Value is plaintext (stored when no key was available)
@@ -159,9 +170,8 @@ class Secret(BaseModel):
self._plaintext_cache = self.encrypted_value
return self.encrypted_value
# Handle decryption failure (might be plaintext stored as such)
# Handle decryption failure - check if value might be plaintext
elif "Failed to decrypt data" in error_msg:
# Check if it might be plaintext
if not CryptoUtils.is_encrypted(self.encrypted_value):
# It's plaintext that was stored when no key was available
logger.debug("Secret value appears to be plaintext (stored without encryption)")
@@ -171,13 +181,6 @@ class Secret(BaseModel):
logger.error("Failed to decrypt Secret value - data may be corrupted or wrong key")
raise
# Migration case: handle legacy plaintext
elif not self.was_encrypted:
if self.encrypted_value and not CryptoUtils.is_encrypted(self.encrypted_value):
self._plaintext_cache = self.encrypted_value
return self.encrypted_value
return None
# Re-raise for other errors
raise

View File

@@ -833,44 +833,26 @@ class MCPManager:
def _oauth_orm_to_pydantic(self, oauth_session: MCPOAuth) -> MCPOAuthSession:
"""
Convert OAuth ORM model to Pydantic model, handling decryption of sensitive fields.
Note: Prefers encrypted columns (_enc fields), falls back to plaintext with error logging.
This helps identify unmigrated data during the migration period.
"""
# Get decrypted values using the dual-read approach
# Secret.from_db() will automatically use settings.encryption_key if available
access_token = None
if oauth_session.access_token_enc or oauth_session.access_token:
if settings.encryption_key:
secret = Secret.from_db(oauth_session.access_token_enc, oauth_session.access_token)
access_token = secret.get_plaintext()
else:
# No encryption key, use plaintext if available
access_token = oauth_session.access_token
# Get decrypted values - prefer encrypted, fallback to plaintext with error logging
access_token = Secret.from_db(
encrypted_value=oauth_session.access_token_enc, plaintext_value=oauth_session.access_token
).get_plaintext()
refresh_token = None
if oauth_session.refresh_token_enc or oauth_session.refresh_token:
if settings.encryption_key:
secret = Secret.from_db(oauth_session.refresh_token_enc, oauth_session.refresh_token)
refresh_token = secret.get_plaintext()
else:
# No encryption key, use plaintext if available
refresh_token = oauth_session.refresh_token
refresh_token = Secret.from_db(
encrypted_value=oauth_session.refresh_token_enc, plaintext_value=oauth_session.refresh_token
).get_plaintext()
client_secret = None
if oauth_session.client_secret_enc or oauth_session.client_secret:
if settings.encryption_key:
secret = Secret.from_db(oauth_session.client_secret_enc, oauth_session.client_secret)
client_secret = secret.get_plaintext()
else:
# No encryption key, use plaintext if available
client_secret = oauth_session.client_secret
client_secret = Secret.from_db(
encrypted_value=oauth_session.client_secret_enc, plaintext_value=oauth_session.client_secret
).get_plaintext()
authorization_code = None
if oauth_session.authorization_code_enc or oauth_session.authorization_code:
if settings.encryption_key:
secret = Secret.from_db(oauth_session.authorization_code_enc, oauth_session.authorization_code)
authorization_code = secret.get_plaintext()
else:
# No encryption key, use plaintext if available
authorization_code = oauth_session.authorization_code
authorization_code = Secret.from_db(
encrypted_value=oauth_session.authorization_code_enc, plaintext_value=oauth_session.authorization_code
).get_plaintext()
# Create the Pydantic object with encrypted fields as Secret objects
pydantic_session = MCPOAuthSession(

View File

@@ -988,44 +988,26 @@ class MCPServerManager:
def _oauth_orm_to_pydantic(self, oauth_session: MCPOAuth) -> MCPOAuthSession:
"""
Convert OAuth ORM model to Pydantic model, handling decryption of sensitive fields.
Note: Prefers encrypted columns (_enc fields), falls back to plaintext with error logging.
This helps identify unmigrated data during the migration period.
"""
# Get decrypted values using the dual-read approach
# Secret.from_db() will automatically use settings.encryption_key if available
access_token = None
if oauth_session.access_token_enc or oauth_session.access_token:
if settings.encryption_key:
secret = Secret.from_db(oauth_session.access_token_enc, oauth_session.access_token)
access_token = secret.get_plaintext()
else:
# No encryption key, use plaintext if available
access_token = oauth_session.access_token
# Get decrypted values - prefer encrypted, fallback to plaintext with error logging
access_token = Secret.from_db(
encrypted_value=oauth_session.access_token_enc, plaintext_value=oauth_session.access_token
).get_plaintext()
refresh_token = None
if oauth_session.refresh_token_enc or oauth_session.refresh_token:
if settings.encryption_key:
secret = Secret.from_db(oauth_session.refresh_token_enc, oauth_session.refresh_token)
refresh_token = secret.get_plaintext()
else:
# No encryption key, use plaintext if available
refresh_token = oauth_session.refresh_token
refresh_token = Secret.from_db(
encrypted_value=oauth_session.refresh_token_enc, plaintext_value=oauth_session.refresh_token
).get_plaintext()
client_secret = None
if oauth_session.client_secret_enc or oauth_session.client_secret:
if settings.encryption_key:
secret = Secret.from_db(oauth_session.client_secret_enc, oauth_session.client_secret)
client_secret = secret.get_plaintext()
else:
# No encryption key, use plaintext if available
client_secret = oauth_session.client_secret
client_secret = Secret.from_db(
encrypted_value=oauth_session.client_secret_enc, plaintext_value=oauth_session.client_secret
).get_plaintext()
authorization_code = None
if oauth_session.authorization_code_enc or oauth_session.authorization_code:
if settings.encryption_key:
secret = Secret.from_db(oauth_session.authorization_code_enc, oauth_session.authorization_code)
authorization_code = secret.get_plaintext()
else:
# No encryption key, use plaintext if available
authorization_code = oauth_session.authorization_code
authorization_code = Secret.from_db(
encrypted_value=oauth_session.authorization_code_enc, plaintext_value=oauth_session.authorization_code
).get_plaintext()
# Create the Pydantic object with encrypted fields as Secret objects
pydantic_session = MCPOAuthSession(

View File

@@ -192,38 +192,49 @@ class TestMCPServerEncryption:
settings.encryption_key = original_key
@pytest.mark.asyncio
@patch.dict(os.environ, {}, clear=True) # No encryption key
@patch("letta.services.mcp_manager.MCPManager.get_mcp_client")
async def test_create_mcp_server_without_encryption_key(self, mock_get_client, server, default_user):
"""Test that MCP servers work without encryption key (backward compatibility)."""
# Remove encryption key
os.environ.pop("LETTA_ENCRYPTION_KEY", None)
async def test_create_mcp_server_without_encryption_key_stores_plaintext(self, mock_get_client, server, default_user):
"""Test that MCP servers work without encryption key by storing plaintext in _enc column.
# Mock the MCP client
mock_client = AsyncMock()
mock_client.list_tools.return_value = []
mock_get_client.return_value = mock_client
Note: In Phase 1 of migration, if no encryption key is configured, the value
is stored as plaintext directly in the _enc column. This allows users without
encryption keys to continue working while migrating off the old plaintext columns.
"""
# Save and clear encryption key
original_key = settings.encryption_key
settings.encryption_key = None
server_name = f"test_no_encrypt_server_{uuid4().hex[:8]}"
token = "plaintext-token-no-encryption"
try:
# Mock the MCP client
mock_client = AsyncMock()
mock_client.list_tools.return_value = []
mock_get_client.return_value = mock_client
mcp_server = PydanticMCPServer(
server_name=server_name, server_type=MCPServerType.SSE, server_url="https://api.example.com", token=token
)
server_name = f"test_no_encrypt_server_{uuid4().hex[:8]}"
token = "plaintext-token-no-encryption"
created_server = await server.mcp_manager.create_or_update_mcp_server(mcp_server, actor=default_user)
mcp_server = PydanticMCPServer(
server_name=server_name, server_type=MCPServerType.SSE, server_url="https://api.example.com", token=token
)
# Check database - should store as plaintext
async with db_registry.async_session() as session:
result = await session.execute(select(ORMMCPServer).where(ORMMCPServer.id == created_server.id))
db_server = result.scalar_one()
# Should work without encryption key - stores plaintext in _enc column
created_server = await server.mcp_manager.create_or_update_mcp_server(mcp_server, actor=default_user)
# Should store in plaintext column
assert db_server.token == token
assert db_server.token_enc is None # No encryption
# Check database - should store plaintext in _enc column
async with db_registry.async_session() as session:
result = await session.execute(select(ORMMCPServer).where(ORMMCPServer.id == created_server.id))
db_server = result.scalar_one()
# Clean up
await server.mcp_manager.delete_mcp_server_by_id(created_server.id, actor=default_user)
# Token should be stored as plaintext in _enc column (not encrypted)
assert db_server.token_enc == token # Plaintext stored directly
# Legacy plaintext column should also be populated (dual-write)
assert db_server.token == token
# Clean up
await server.mcp_manager.delete_mcp_server_by_id(created_server.id, actor=default_user)
finally:
# Restore original encryption key
settings.encryption_key = original_key
class TestMCPOAuthEncryption:
@@ -408,8 +419,13 @@ class TestMCPOAuthEncryption:
settings.encryption_key = original_key
@pytest.mark.asyncio
async def test_dual_read_backward_compatibility(self, server, default_user):
"""Test that system can read both encrypted and plaintext values (migration support)."""
async def test_encrypted_only_reads(self, server, default_user):
"""Test that system only reads from encrypted columns, ignoring plaintext.
Note: In Phase 1 of migration, reads are encrypted-only. Plaintext columns
are ignored even if they contain values. This test verifies that the
encrypted value is used and plaintext is never used as fallback.
"""
# Set encryption key directly on settings
original_key = settings.encryption_key
settings.encryption_key = self.MOCK_ENCRYPTION_KEY
@@ -426,10 +442,10 @@ class TestMCPOAuthEncryption:
id=session_id,
state=f"dual-read-state-{uuid4().hex[:8]}",
server_url="https://test.com/mcp",
server_name="Dual Read Test",
server_name="Encrypted Only Read Test",
# Both encrypted and plaintext values
access_token=plaintext_token, # Legacy plaintext
access_token_enc=encrypted_new, # New encrypted
access_token=plaintext_token, # Legacy plaintext - should be ignored
access_token_enc=encrypted_new, # Encrypted value - should be used
client_id="test-client",
user_id=default_user.id,
organization_id=default_user.organization_id,
@@ -443,7 +459,7 @@ class TestMCPOAuthEncryption:
test_session = await server.mcp_manager.get_oauth_session_by_id(session_id, actor=default_user)
assert test_session is not None
# Should prefer encrypted value over plaintext
# Should use encrypted value only (plaintext is ignored)
assert test_session.access_token == new_encrypted_token
# Clean up not needed - test database is reset
@@ -451,3 +467,58 @@ class TestMCPOAuthEncryption:
finally:
# Restore original encryption key
settings.encryption_key = original_key
@pytest.mark.asyncio
async def test_plaintext_only_record_fallback_with_error_logging(self, server, default_user, caplog):
"""Test that records with only plaintext values fall back to plaintext with error logging.
Note: In Phase 1 of migration, if a record only has plaintext value
(no encrypted value), the system falls back to plaintext but logs an error
to help identify unmigrated data.
"""
import logging
# Set encryption key directly on settings
original_key = settings.encryption_key
settings.encryption_key = self.MOCK_ENCRYPTION_KEY
try:
# Insert a record with only plaintext value (no encrypted)
session_id = f"mcp-oauth-{str(uuid4())[:8]}"
plaintext_token = "legacy-plaintext-token"
async with db_registry.async_session() as session:
db_oauth = MCPOAuth(
id=session_id,
state=f"plaintext-only-state-{uuid4().hex[:8]}",
server_url="https://test.com/mcp",
server_name="Plaintext Only Test",
# Only plaintext value, no encrypted
access_token=plaintext_token, # Legacy plaintext - should fallback with error log
access_token_enc=None, # No encrypted value
client_id="test-client",
user_id=default_user.id,
organization_id=default_user.organization_id,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(db_oauth)
await session.commit()
# Retrieve through manager - should log error about plaintext fallback
with caplog.at_level(logging.ERROR):
test_session = await server.mcp_manager.get_oauth_session_by_id(session_id, actor=default_user)
assert test_session is not None
# Should fall back to plaintext value
assert test_session.access_token == plaintext_token
# Should have logged an error about reading from plaintext column
assert "MIGRATION_NEEDED" in caplog.text
# Clean up not needed - test database is reset
finally:
# Restore original encryption key
settings.encryption_key = original_key

View File

@@ -35,8 +35,8 @@ class TestSecret:
finally:
settings.encryption_key = original_key
def test_from_plaintext_without_key(self):
"""Test creating a Secret from plaintext without encryption key (fallback behavior)."""
def test_from_plaintext_without_key_stores_plaintext(self):
"""Test creating a Secret from plaintext without encryption key stores as plaintext."""
from letta.settings import settings
# Clear encryption key
@@ -46,10 +46,10 @@ class TestSecret:
try:
plaintext = "my-plaintext-value"
# Should now handle gracefully and store as plaintext
# Should store as plaintext in _enc column when no encryption key
secret = Secret.from_plaintext(plaintext)
# Should store the plaintext value
# Should store the plaintext value directly in encrypted_value
assert secret.encrypted_value == plaintext
assert secret.get_plaintext() == plaintext
assert not secret.was_encrypted
@@ -103,8 +103,14 @@ class TestSecret:
finally:
settings.encryption_key = original_key
def test_from_db_with_plaintext_value(self):
"""Test creating a Secret from database with plaintext value (backward compatibility)."""
def test_from_db_with_plaintext_value_fallback(self, caplog):
"""Test creating a Secret from database with only plaintext value falls back with error logging.
Note: In Phase 1 of migration, from_db() prefers encrypted but falls back to plaintext
with error logging to help identify unmigrated data.
"""
import logging
from letta.settings import settings
original_key = settings.encryption_key
@@ -113,13 +119,16 @@ class TestSecret:
try:
plaintext = "legacy-plaintext"
# When only plaintext is provided, should encrypt it
secret = Secret.from_db(encrypted_value=None, plaintext_value=plaintext)
# When only plaintext is provided, should fall back to plaintext with error logging
with caplog.at_level(logging.ERROR):
secret = Secret.from_db(encrypted_value=None, plaintext_value=plaintext)
# Should encrypt the plaintext
assert secret.encrypted_value is not None
assert secret.was_encrypted is False
# Should use the plaintext value (fallback)
assert secret.get_plaintext() == plaintext
# Should have logged an error about reading from plaintext column
assert "MIGRATION_NEEDED" in caplog.text
assert "plaintext column" in caplog.text
finally:
settings.encryption_key = original_key