diff --git a/alembic/versions/a1b2c3d4e5f7_add_blocks_conversations_table.py b/alembic/versions/a1b2c3d4e5f7_add_blocks_conversations_table.py new file mode 100644 index 00000000..57d2a2cf --- /dev/null +++ b/alembic/versions/a1b2c3d4e5f7_add_blocks_conversations_table.py @@ -0,0 +1,48 @@ +"""Add blocks_conversations table for conversation-isolated blocks + +Revision ID: a1b2c3d4e5f7 +Revises: cf3c4d025dbc +Create Date: 2026-01-14 02:22:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "a1b2c3d4e5f7" +down_revision: Union[str, None] = "cf3c4d025dbc" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Create blocks_conversations junction table + op.create_table( + "blocks_conversations", + sa.Column("conversation_id", sa.String(), nullable=False), + sa.Column("block_id", sa.String(), nullable=False), + sa.Column("block_label", sa.String(), nullable=False), + sa.ForeignKeyConstraint( + ["conversation_id"], + ["conversations.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["block_id"], + ["block.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("conversation_id", "block_id", "block_label"), + sa.UniqueConstraint("conversation_id", "block_label", name="unique_label_per_conversation"), + sa.UniqueConstraint("conversation_id", "block_id", name="unique_conversation_block"), + ) + op.create_index("ix_blocks_conversations_block_id", "blocks_conversations", ["block_id"], unique=False) + + +def downgrade() -> None: + op.drop_index("ix_blocks_conversations_block_id", table_name="blocks_conversations") + op.drop_table("blocks_conversations") diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index afbdbcc3..7b18c89e 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -130,6 +130,15 @@ class LettaAgentV3(LettaAgentV2): self._initialize_state() self.conversation_id = conversation_id self.client_tools = client_tools or [] + + # Apply conversation-specific block overrides if conversation_id is provided + if conversation_id: + self.agent_state = await ConversationManager().apply_isolated_blocks_to_agent_state( + agent_state=self.agent_state, + conversation_id=conversation_id, + actor=self.actor, + ) + request_span = self._request_checkpoint_start(request_start_timestamp_ns=request_start_timestamp_ns) response_letta_messages = [] @@ -286,6 +295,14 @@ class LettaAgentV3(LettaAgentV2): response_letta_messages = [] first_chunk = True + # Apply conversation-specific block overrides if conversation_id is provided + if conversation_id: + self.agent_state = await ConversationManager().apply_isolated_blocks_to_agent_state( + agent_state=self.agent_state, + conversation_id=conversation_id, + actor=self.actor, + ) + if stream_tokens: llm_adapter = SimpleLLMStreamAdapter( llm_client=self.llm_client, diff --git a/letta/orm/__init__.py b/letta/orm/__init__.py index 47fe5b1d..310e8d2f 100644 --- a/letta/orm/__init__.py +++ b/letta/orm/__init__.py @@ -6,6 +6,7 @@ from letta.orm.base import Base from letta.orm.block import Block from letta.orm.block_history import BlockHistory from letta.orm.blocks_agents import BlocksAgents +from letta.orm.blocks_conversations import BlocksConversations from letta.orm.blocks_tags import BlocksTags from letta.orm.conversation import Conversation from letta.orm.conversation_messages import ConversationMessage diff --git a/letta/orm/blocks_conversations.py b/letta/orm/blocks_conversations.py new file mode 100644 index 00000000..4068af41 --- /dev/null +++ b/letta/orm/blocks_conversations.py @@ -0,0 +1,19 @@ +from sqlalchemy import ForeignKey, Index, String, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column + +from letta.orm.base import Base + + +class BlocksConversations(Base): + """Tracks conversation-specific blocks that override agent defaults for isolated memory.""" + + __tablename__ = "blocks_conversations" + __table_args__ = ( + UniqueConstraint("conversation_id", "block_label", name="unique_label_per_conversation"), + UniqueConstraint("conversation_id", "block_id", name="unique_conversation_block"), + Index("ix_blocks_conversations_block_id", "block_id"), + ) + + conversation_id: Mapped[str] = mapped_column(String, ForeignKey("conversations.id", ondelete="CASCADE"), primary_key=True) + block_id: Mapped[str] = mapped_column(String, ForeignKey("block.id", ondelete="CASCADE"), primary_key=True) + block_label: Mapped[str] = mapped_column(String, primary_key=True) diff --git a/letta/orm/conversation.py b/letta/orm/conversation.py index e425256a..a3fe7a9f 100644 --- a/letta/orm/conversation.py +++ b/letta/orm/conversation.py @@ -10,6 +10,7 @@ from letta.schemas.conversation import Conversation as PydanticConversation if TYPE_CHECKING: from letta.orm.agent import Agent + from letta.orm.block import Block from letta.orm.conversation_messages import ConversationMessage @@ -35,6 +36,13 @@ class Conversation(SqlalchemyBase, OrganizationMixin): cascade="all, delete-orphan", lazy="selectin", ) + isolated_blocks: Mapped[List["Block"]] = relationship( + "Block", + secondary="blocks_conversations", + lazy="selectin", + passive_deletes=True, + doc="Conversation-specific blocks that override agent defaults for isolated memory.", + ) def to_pydantic(self) -> PydanticConversation: """Converts the SQLAlchemy model to its Pydantic counterpart.""" @@ -46,4 +54,5 @@ class Conversation(SqlalchemyBase, OrganizationMixin): updated_at=self.updated_at, created_by_id=self.created_by_id, last_updated_by_id=self.last_updated_by_id, + isolated_block_ids=[b.id for b in self.isolated_blocks] if self.isolated_blocks else [], ) diff --git a/letta/schemas/conversation.py b/letta/schemas/conversation.py index 0027938f..c2a94007 100644 --- a/letta/schemas/conversation.py +++ b/letta/schemas/conversation.py @@ -14,12 +14,21 @@ class Conversation(OrmMetadataBase): agent_id: str = Field(..., description="The ID of the agent this conversation belongs to.") summary: Optional[str] = Field(None, description="A summary of the conversation.") in_context_message_ids: List[str] = Field(default_factory=list, description="The IDs of in-context messages for the conversation.") + isolated_block_ids: List[str] = Field( + default_factory=list, + description="IDs of blocks that are isolated (specific to this conversation, overriding agent defaults).", + ) class CreateConversation(BaseModel): """Request model for creating a new conversation.""" summary: Optional[str] = Field(None, description="A summary of the conversation.") + isolated_block_labels: Optional[List[str]] = Field( + None, + description="List of block labels that should be isolated (conversation-specific) rather than shared across conversations. " + "New blocks will be created as copies of the agent's blocks with these labels.", + ) class UpdateConversation(BaseModel): diff --git a/letta/services/agent_manager.py b/letta/services/agent_manager.py index f7b3560b..c18da34a 100644 --- a/letta/services/agent_manager.py +++ b/letta/services/agent_manager.py @@ -1622,7 +1622,9 @@ class AgentManager: updated_value = new_memory.get_block(label).value if updated_value != agent_state.memory.get_block(label).value: # update the block if it's changed - block_id = agent_state.memory.get_block(label).id + # Use block ID from new_memory, not agent_state.memory, because new_memory + # may contain conversation-isolated blocks with different IDs + block_id = new_memory.get_block(label).id await self.block_manager.update_block_async( block_id=block_id, block_update=BlockUpdate(value=updated_value), actor=actor ) diff --git a/letta/services/conversation_manager.py b/letta/services/conversation_manager.py index dade99d4..62239fb4 100644 --- a/letta/services/conversation_manager.py +++ b/letta/services/conversation_manager.py @@ -1,12 +1,22 @@ -from typing import List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional -from sqlalchemy import func, select +if TYPE_CHECKING: + pass +# Import AgentState outside TYPE_CHECKING for @enforce_types decorator +from sqlalchemy import delete, func, select + +from letta.errors import LettaInvalidArgumentError +from letta.orm.agent import Agent as AgentModel +from letta.orm.block import Block as BlockModel +from letta.orm.blocks_conversations import BlocksConversations from letta.orm.conversation import Conversation as ConversationModel from letta.orm.conversation_messages import ConversationMessage as ConversationMessageModel from letta.orm.errors import NoResultFound from letta.orm.message import Message as MessageModel from letta.otel.tracing import trace_method +from letta.schemas.agent import AgentState +from letta.schemas.block import Block as PydanticBlock from letta.schemas.conversation import Conversation as PydanticConversation, CreateConversation, UpdateConversation from letta.schemas.letta_message import LettaMessage from letta.schemas.message import Message as PydanticMessage @@ -26,7 +36,17 @@ class ConversationManager: conversation_create: CreateConversation, actor: PydanticUser, ) -> PydanticConversation: - """Create a new conversation for an agent.""" + """Create a new conversation for an agent. + + Args: + agent_id: The ID of the agent this conversation belongs to + conversation_create: The conversation creation request, optionally including + isolated_block_labels for conversation-specific memory blocks + actor: The user performing the action + + Returns: + The created conversation with isolated_block_ids if any were created + """ async with db_registry.async_session() as session: conversation = ConversationModel( agent_id=agent_id, @@ -34,7 +54,21 @@ class ConversationManager: organization_id=actor.organization_id, ) await conversation.create_async(session, actor=actor) - return conversation.to_pydantic() + + # Handle isolated blocks if requested + isolated_block_ids = [] + if conversation_create.isolated_block_labels: + isolated_block_ids = await self._create_isolated_blocks( + session=session, + conversation=conversation, + agent_id=agent_id, + isolated_block_labels=conversation_create.isolated_block_labels, + actor=actor, + ) + + pydantic_conversation = conversation.to_pydantic() + pydantic_conversation.isolated_block_ids = isolated_block_ids + return pydantic_conversation @enforce_types @trace_method @@ -119,17 +153,30 @@ class ConversationManager: conversation_id: str, actor: PydanticUser, ) -> None: - """Soft delete a conversation.""" + """Soft delete a conversation and hard-delete its isolated blocks.""" async with db_registry.async_session() as session: conversation = await ConversationModel.read_async( db_session=session, identifier=conversation_id, actor=actor, ) - # Soft delete by setting is_deleted flag + + # Get isolated blocks before modifying conversation + isolated_blocks = list(conversation.isolated_blocks) + + # Soft delete the conversation first conversation.is_deleted = True await conversation.update_async(db_session=session, actor=actor) + # Hard-delete isolated blocks (Block model doesn't support soft-delete) + # Following same pattern as block_manager.delete_block_async + for block in isolated_blocks: + # Delete junction table entry first + await session.execute(delete(BlocksConversations).where(BlocksConversations.block_id == block.id)) + await session.flush() + # Then hard-delete the block + await block.hard_delete_async(db_session=session, actor=actor) + # ==================== Message Management Methods ==================== @enforce_types @@ -355,3 +402,144 @@ class ConversationManager: # Convert to LettaMessages return PydanticMessage.to_letta_messages_from_list(messages, reverse=False, text_is_assistant_message=True) + + # ==================== Isolated Blocks Methods ==================== + + async def _create_isolated_blocks( + self, + session, + conversation: ConversationModel, + agent_id: str, + isolated_block_labels: List[str], + actor: PydanticUser, + ) -> List[str]: + """Create conversation-specific copies of blocks for isolated labels. + + Args: + session: The database session + conversation: The conversation model (must be created but not yet committed) + agent_id: The agent ID to get source blocks from + isolated_block_labels: List of block labels to isolate + actor: The user performing the action + + Returns: + List of created block IDs + + Raises: + LettaInvalidArgumentError: If a block label is not found on the agent + """ + # Get the agent with its blocks + agent = await AgentModel.read_async(db_session=session, identifier=agent_id, actor=actor) + + # Map of label -> agent block + agent_blocks_by_label = {block.label: block for block in agent.core_memory} + + created_block_ids = [] + for label in isolated_block_labels: + if label not in agent_blocks_by_label: + raise LettaInvalidArgumentError( + f"Block with label '{label}' not found on agent '{agent_id}'", + argument_name="isolated_block_labels", + ) + + source_block = agent_blocks_by_label[label] + + # Create a copy of the block with a new ID using Pydantic schema (which auto-generates ID) + new_block_pydantic = PydanticBlock( + label=source_block.label, + description=source_block.description, + value=source_block.value, + limit=source_block.limit, + metadata=source_block.metadata_, + read_only=source_block.read_only, + ) + + # Convert to ORM model + block_data = new_block_pydantic.model_dump(to_orm=True, exclude_none=True) + new_block = BlockModel(**block_data, organization_id=actor.organization_id) + await new_block.create_async(session, actor=actor) + + # Create the junction table entry + blocks_conv = BlocksConversations( + conversation_id=conversation.id, + block_id=new_block.id, + block_label=label, + ) + session.add(blocks_conv) + created_block_ids.append(new_block.id) + + return created_block_ids + + @enforce_types + @trace_method + async def get_isolated_blocks_for_conversation( + self, + conversation_id: str, + actor: PydanticUser, + ) -> Dict[str, PydanticBlock]: + """Get isolated blocks for a conversation, keyed by label. + + Args: + conversation_id: The conversation ID + actor: The user performing the action + + Returns: + Dictionary mapping block labels to their conversation-specific blocks + """ + async with db_registry.async_session() as session: + conversation = await ConversationModel.read_async( + db_session=session, + identifier=conversation_id, + actor=actor, + check_is_deleted=True, + ) + return {block.label: block.to_pydantic() for block in conversation.isolated_blocks} + + @enforce_types + @trace_method + async def apply_isolated_blocks_to_agent_state( + self, + agent_state: "AgentState", + conversation_id: str, + actor: PydanticUser, + ) -> "AgentState": + """Apply conversation-specific block overrides to an agent state. + + This method modifies the agent_state.memory to replace blocks that have + conversation-specific isolated versions. + + Args: + agent_state: The agent state to modify (will be modified in place) + conversation_id: The conversation ID to get isolated blocks from + actor: The user performing the action + + Returns: + The modified agent state (same object, modified in place) + """ + from letta.schemas.memory import Memory + + # Get conversation's isolated blocks + isolated_blocks = await self.get_isolated_blocks_for_conversation( + conversation_id=conversation_id, + actor=actor, + ) + + if not isolated_blocks: + return agent_state + + # Override agent's blocks with conversation-specific blocks + memory_blocks = [] + for block in agent_state.memory.blocks: + if block.label in isolated_blocks: + memory_blocks.append(isolated_blocks[block.label]) + else: + memory_blocks.append(block) + + # Create new Memory with overridden blocks + agent_state.memory = Memory( + blocks=memory_blocks, + file_blocks=agent_state.memory.file_blocks, + agent_type=agent_state.memory.agent_type, + ) + + return agent_state diff --git a/tests/managers/test_conversation_manager.py b/tests/managers/test_conversation_manager.py index 4fc3d4e4..43999091 100644 --- a/tests/managers/test_conversation_manager.py +++ b/tests/managers/test_conversation_manager.py @@ -504,3 +504,248 @@ async def test_list_conversation_messages_pagination(conversation_manager, serve after=messages[0].id, ) assert len(letta_messages_after) == 4 # Should get messages 1-4 + + +# ====================================================================================================================== +# Isolated Blocks Tests +# ====================================================================================================================== + + +@pytest.mark.asyncio +async def test_create_conversation_with_isolated_blocks(conversation_manager, server: SyncServer, charles_agent, default_user): + """Test creating a conversation with isolated block labels.""" + # Get the agent's blocks to know what labels exist + agent_state = await server.agent_manager.get_agent_by_id_async(charles_agent.id, default_user, include_relationships=["memory"]) + block_labels = [block.label for block in agent_state.memory.blocks] + assert len(block_labels) > 0, "Agent should have at least one block" + + # Create conversation with isolated blocks + first_label = block_labels[0] + conversation = await conversation_manager.create_conversation( + agent_id=charles_agent.id, + conversation_create=CreateConversation( + summary="Test with isolated blocks", + isolated_block_labels=[first_label], + ), + actor=default_user, + ) + + assert conversation.id is not None + assert conversation.agent_id == charles_agent.id + assert len(conversation.isolated_block_ids) == 1 + + # Verify the isolated block was created + isolated_blocks = await conversation_manager.get_isolated_blocks_for_conversation( + conversation_id=conversation.id, + actor=default_user, + ) + assert first_label in isolated_blocks + assert isolated_blocks[first_label].label == first_label + + +@pytest.mark.asyncio +async def test_isolated_blocks_have_different_ids(conversation_manager, server: SyncServer, charles_agent, default_user): + """Test that isolated blocks have different IDs from agent's original blocks.""" + # Get the agent's blocks + agent_state = await server.agent_manager.get_agent_by_id_async(charles_agent.id, default_user, include_relationships=["memory"]) + original_block = agent_state.memory.blocks[0] + + # Create conversation with isolated block + conversation = await conversation_manager.create_conversation( + agent_id=charles_agent.id, + conversation_create=CreateConversation( + summary="Test isolated block IDs", + isolated_block_labels=[original_block.label], + ), + actor=default_user, + ) + + # Get the isolated blocks + isolated_blocks = await conversation_manager.get_isolated_blocks_for_conversation( + conversation_id=conversation.id, + actor=default_user, + ) + + # Verify the isolated block has a different ID + isolated_block = isolated_blocks[original_block.label] + assert isolated_block.id != original_block.id + assert isolated_block.label == original_block.label + assert isolated_block.value == original_block.value # Same initial value + + +@pytest.mark.asyncio +async def test_isolated_blocks_are_conversation_specific(conversation_manager, server: SyncServer, charles_agent, default_user): + """Test that isolated blocks are specific to each conversation.""" + # Get the agent's first block label + agent_state = await server.agent_manager.get_agent_by_id_async(charles_agent.id, default_user, include_relationships=["memory"]) + block_label = agent_state.memory.blocks[0].label + + # Create two conversations with the same isolated block label + conv1 = await conversation_manager.create_conversation( + agent_id=charles_agent.id, + conversation_create=CreateConversation( + summary="Conversation 1", + isolated_block_labels=[block_label], + ), + actor=default_user, + ) + + conv2 = await conversation_manager.create_conversation( + agent_id=charles_agent.id, + conversation_create=CreateConversation( + summary="Conversation 2", + isolated_block_labels=[block_label], + ), + actor=default_user, + ) + + # Get isolated blocks for both conversations + isolated_blocks_1 = await conversation_manager.get_isolated_blocks_for_conversation( + conversation_id=conv1.id, + actor=default_user, + ) + isolated_blocks_2 = await conversation_manager.get_isolated_blocks_for_conversation( + conversation_id=conv2.id, + actor=default_user, + ) + + # Verify they have different block IDs + block_1 = isolated_blocks_1[block_label] + block_2 = isolated_blocks_2[block_label] + assert block_1.id != block_2.id + + +@pytest.mark.asyncio +async def test_create_conversation_invalid_block_label(conversation_manager, server: SyncServer, charles_agent, default_user): + """Test that creating a conversation with non-existent block label raises error.""" + from letta.errors import LettaInvalidArgumentError + + with pytest.raises(LettaInvalidArgumentError) as exc_info: + await conversation_manager.create_conversation( + agent_id=charles_agent.id, + conversation_create=CreateConversation( + summary="Test invalid label", + isolated_block_labels=["nonexistent_block_label"], + ), + actor=default_user, + ) + + assert "nonexistent_block_label" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_apply_isolated_blocks_to_agent_state(conversation_manager, server: SyncServer, charles_agent, default_user): + """Test that isolated blocks are correctly applied to agent state.""" + # Get the original agent state + original_agent_state = await server.agent_manager.get_agent_by_id_async( + charles_agent.id, default_user, include_relationships=["memory"] + ) + original_block = original_agent_state.memory.blocks[0] + + # Create conversation with isolated block + conversation = await conversation_manager.create_conversation( + agent_id=charles_agent.id, + conversation_create=CreateConversation( + summary="Test apply isolated blocks", + isolated_block_labels=[original_block.label], + ), + actor=default_user, + ) + + # Get fresh agent state + agent_state = await server.agent_manager.get_agent_by_id_async(charles_agent.id, default_user, include_relationships=["memory"]) + + # Apply isolated blocks + modified_state = await conversation_manager.apply_isolated_blocks_to_agent_state( + agent_state=agent_state, + conversation_id=conversation.id, + actor=default_user, + ) + + # Verify the block was replaced + modified_block = modified_state.memory.get_block(original_block.label) + assert modified_block.id != original_block.id + assert modified_block.label == original_block.label + assert modified_block.id in conversation.isolated_block_ids + + +@pytest.mark.asyncio +async def test_conversation_without_isolated_blocks(conversation_manager, server: SyncServer, sarah_agent, default_user): + """Test that creating a conversation without isolated blocks works normally.""" + conversation = await conversation_manager.create_conversation( + agent_id=sarah_agent.id, + conversation_create=CreateConversation(summary="No isolated blocks"), + actor=default_user, + ) + + assert conversation.id is not None + assert conversation.isolated_block_ids == [] + + isolated_blocks = await conversation_manager.get_isolated_blocks_for_conversation( + conversation_id=conversation.id, + actor=default_user, + ) + assert isolated_blocks == {} + + +@pytest.mark.asyncio +async def test_apply_no_isolated_blocks_preserves_state(conversation_manager, server: SyncServer, charles_agent, default_user): + """Test that applying isolated blocks to a conversation without them preserves original state.""" + # Create conversation without isolated blocks + conversation = await conversation_manager.create_conversation( + agent_id=charles_agent.id, + conversation_create=CreateConversation(summary="No isolated blocks"), + actor=default_user, + ) + + # Get agent state + agent_state = await server.agent_manager.get_agent_by_id_async(charles_agent.id, default_user, include_relationships=["memory"]) + original_block_ids = [block.id for block in agent_state.memory.blocks] + + # Apply isolated blocks (should be a no-op) + modified_state = await conversation_manager.apply_isolated_blocks_to_agent_state( + agent_state=agent_state, + conversation_id=conversation.id, + actor=default_user, + ) + + # Verify blocks are unchanged + modified_block_ids = [block.id for block in modified_state.memory.blocks] + assert original_block_ids == modified_block_ids + + +@pytest.mark.asyncio +async def test_delete_conversation_cleans_up_isolated_blocks(conversation_manager, server: SyncServer, charles_agent, default_user): + """Test that deleting a conversation also hard-deletes its isolated blocks.""" + # Get the agent's first block label + agent_state = await server.agent_manager.get_agent_by_id_async(charles_agent.id, default_user, include_relationships=["memory"]) + block_label = agent_state.memory.blocks[0].label + + # Create conversation with isolated block + conversation = await conversation_manager.create_conversation( + agent_id=charles_agent.id, + conversation_create=CreateConversation( + summary="Test delete cleanup", + isolated_block_labels=[block_label], + ), + actor=default_user, + ) + + # Get the isolated block ID + isolated_block_ids = conversation.isolated_block_ids + assert len(isolated_block_ids) == 1 + isolated_block_id = isolated_block_ids[0] + + # Verify the isolated block exists + isolated_block = await server.block_manager.get_block_by_id_async(isolated_block_id, default_user) + assert isolated_block is not None + + # Delete the conversation + await conversation_manager.delete_conversation( + conversation_id=conversation.id, + actor=default_user, + ) + + # Verify the isolated block was hard-deleted + deleted_block = await server.block_manager.get_block_by_id_async(isolated_block_id, default_user) + assert deleted_block is None