diff --git a/letta/constants.py b/letta/constants.py index bf0fa3b6..a06cd906 100644 --- a/letta/constants.py +++ b/letta/constants.py @@ -89,7 +89,7 @@ SEND_MESSAGE_TOOL_NAME = "send_message" BASE_TOOLS = [SEND_MESSAGE_TOOL_NAME, "conversation_search", "archival_memory_insert", "archival_memory_search"] DEPRECATED_LETTA_TOOLS = ["archival_memory_insert", "archival_memory_search"] # Base memory tools CAN be edited, and are added by default by the server -BASE_MEMORY_TOOLS = ["core_memory_append", "core_memory_replace"] +BASE_MEMORY_TOOLS = ["core_memory_append", "core_memory_replace", "memory"] # New v2 collection of the base memory tools (effecitvely same as sleeptime set), to pair with memgpt_v2 prompt BASE_MEMORY_TOOLS_V2 = [ "memory_replace", @@ -98,6 +98,11 @@ BASE_MEMORY_TOOLS_V2 = [ # "memory_rethink", # "memory_finish_edits", ] + +# v3 collection, currently just a omni memory tool for anthropic +BASE_MEMORY_TOOLS_V3 = [ + "memory", +] # Base tools if the memgpt agent has enable_sleeptime on BASE_SLEEPTIME_CHAT_TOOLS = [SEND_MESSAGE_TOOL_NAME, "conversation_search", "archival_memory_search"] # Base memory tools for sleeptime agent @@ -118,6 +123,7 @@ BASE_VOICE_SLEEPTIME_TOOLS = [ "rethink_user_memory", "finish_rethinking_memory", ] + # Multi agent tools MULTI_AGENT_TOOLS = ["send_message_to_agent_and_wait_for_reply", "send_message_to_agents_matching_tags", "send_message_to_agent_async"] LOCAL_ONLY_MULTI_AGENT_TOOLS = ["send_message_to_agent_async"] diff --git a/letta/functions/function_sets/base.py b/letta/functions/function_sets/base.py index 345d42dd..6cb4c795 100644 --- a/letta/functions/function_sets/base.py +++ b/letta/functions/function_sets/base.py @@ -1,7 +1,82 @@ -from typing import List, Literal, Optional +from typing import TYPE_CHECKING, Any, List, Literal, Optional from letta.constants import CORE_MEMORY_LINE_NUMBER_WARNING +if TYPE_CHECKING: + from letta.schemas.agent import AgentState + + +def memory( + agent_state: "AgentState", + command: str, + path: Optional[str] = None, + file_text: Optional[str] = None, + description: Optional[str] = None, + old_str: Optional[str] = None, + new_str: Optional[str] = None, + insert_line: Optional[int] = None, + insert_text: Optional[str] = None, + old_path: Optional[str] = None, + new_path: Optional[str] = None, +) -> Optional[str]: + """ + Memory management tool with various sub-commands for memory block operations. + + Args: + command (str): The sub-command to execute. Supported commands: + - "view": List memory blocks or view specific block content + - "create": Create a new memory block + - "str_replace": Replace text in a memory block + - "insert": Insert text at a specific line in a memory block + - "delete": Delete a memory block + - "rename": Rename a memory block + path (Optional[str]): Path to the memory block (for str_replace, insert, delete) + file_text (Optional[str]): The value to set in the memory block (for create) + description (Optional[str]): The description to set in the memory block (for create, rename) + old_str (Optional[str]): Old text to replace (for str_replace) + new_str (Optional[str]): New text to replace with (for str_replace) + insert_line (Optional[int]): Line number to insert at (for insert) + insert_text (Optional[str]): Text to insert (for insert) + old_path (Optional[str]): Old path for rename operation + new_path (Optional[str]): New path for rename operation + view_range (Optional[int]): Range of lines to view (for view) + + Returns: + Optional[str]: Success message or error description + + Examples: + # List all memory blocks + memory(agent_state, "view", path="/memories") + + # View specific memory block content + memory(agent_state, "view", path="/memories/user_preferences") + + # View first 10 lines of a memory block + memory(agent_state, "view", path="/memories/user_preferences", view_range=10) + + # Replace text in a memory block + memory(agent_state, "str_replace", path="/memories/user_preferences", old_str="theme: dark", new_str="theme: light") + + # Insert text at line 5 + memory(agent_state, "insert", path="/memories/notes", insert_line=5, insert_text="New note here") + + # Delete a memory block + memory(agent_state, "delete", path="/memories/old_notes") + + # Rename a memory block + memory(agent_state, "rename", old_path="/memories/temp", new_path="/memories/permanent") + + # Update the description of a memory block + memory(agent_state, "rename", path="/memories/temp", description="The user's temporary notes.") + + # Create a memory block with starting text + memory(agent_state, "create", path="/memories/coding_preferences", "description": "The user's coding preferences.", "file_text": "The user seems to add type hints to all of their Python code.") + + # Create an empty memory block + memory(agent_state, "create", path="/memories/coding_preferences", "description": "The user's coding preferences.") + """ + raise NotImplementedError("This should never be invoked directly. Contact Letta if you see this error message.") + def send_message(self: "Agent", message: str) -> Optional[str]: """ @@ -201,7 +276,10 @@ def rethink_memory(agent_state: "AgentState", new_memory: str, target_block_labe """ if agent_state.memory.get_block(target_block_label) is None: - agent_state.memory.create_block(label=target_block_label, value=new_memory) + from letta.schemas.block import Block + + new_block = Block(label=target_block_label, value=new_memory) + agent_state.memory.set_block(new_block) agent_state.memory.update_block_value(label=target_block_label, value=new_memory) return None @@ -394,7 +472,10 @@ def memory_rethink(agent_state: "AgentState", label: str, new_memory: str) -> No ) if agent_state.memory.get_block(label) is None: - agent_state.memory.create_block(label=label, value=new_memory) + from letta.schemas.block import Block + + new_block = Block(label=label, value=new_memory) + agent_state.memory.set_block(new_block) agent_state.memory.update_block_value(label=label, value=new_memory) diff --git a/letta/services/agent_manager.py b/letta/services/agent_manager.py index 844e3335..d6b62a15 100644 --- a/letta/services/agent_manager.py +++ b/letta/services/agent_manager.py @@ -10,6 +10,7 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert from letta.constants import ( BASE_MEMORY_TOOLS, BASE_MEMORY_TOOLS_V2, + BASE_MEMORY_TOOLS_V3, BASE_SLEEPTIME_CHAT_TOOLS, BASE_SLEEPTIME_TOOLS, BASE_TOOLS, @@ -439,7 +440,7 @@ class AgentManager: for tn in tool_names: if tn in {"send_message", "send_message_to_agent_async", "memory_finish_edits"}: tool_rules.append(TerminalToolRule(tool_name=tn)) - elif tn in (BASE_TOOLS + BASE_MEMORY_TOOLS + BASE_MEMORY_TOOLS_V2 + BASE_SLEEPTIME_TOOLS): + elif tn in (BASE_TOOLS + BASE_MEMORY_TOOLS + BASE_MEMORY_TOOLS_V2 + BASE_MEMORY_TOOLS_V3 + BASE_SLEEPTIME_TOOLS): tool_rules.append(ContinueToolRule(tool_name=tn)) for tool_with_requires_approval in requires_approval: @@ -1365,19 +1366,22 @@ class AgentManager: ) if new_memory_str not in system_message.content[0].text: # update the blocks (LRW) in the DB - for label in agent_state.memory.list_block_labels(): - updated_value = new_memory.get_block(label).value - if updated_value != agent_state.memory.get_block(label).value: - # update the block if it's changed - block_id = agent_state.memory.get_block(label).id - await self.block_manager.update_block_async( - block_id=block_id, block_update=BlockUpdate(value=updated_value), actor=actor - ) + for label in new_memory.list_block_labels(): + if label in agent_state.memory.list_block_labels(): + # Block exists in both old and new memory - check if value changed + updated_value = new_memory.get_block(label).value + if updated_value != agent_state.memory.get_block(label).value: + # update the block if it's changed + block_id = agent_state.memory.get_block(label).id + await self.block_manager.update_block_async( + block_id=block_id, block_update=BlockUpdate(value=updated_value), actor=actor + ) - # refresh memory from DB (using block ids) - blocks = await self.block_manager.get_all_blocks_by_ids_async( - block_ids=[b.id for b in agent_state.memory.get_blocks()], actor=actor - ) + # Note: New blocks are already persisted in the creation methods, + # so we don't need to handle them here + + # refresh memory from DB (using block ids from the new memory) + blocks = await self.block_manager.get_all_blocks_by_ids_async(block_ids=[b.id for b in new_memory.get_blocks()], actor=actor) agent_state.memory = Memory( blocks=blocks, diff --git a/letta/services/tool_executor/core_tool_executor.py b/letta/services/tool_executor/core_tool_executor.py index a2d0b09b..4c381fdb 100644 --- a/letta/services/tool_executor/core_tool_executor.py +++ b/letta/services/tool_executor/core_tool_executor.py @@ -11,6 +11,7 @@ from letta.constants import ( from letta.helpers.json_helpers import json_dumps from letta.log import get_logger from letta.schemas.agent import AgentState +from letta.schemas.block import BlockUpdate from letta.schemas.enums import MessageRole, TagMatchMode from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool @@ -46,8 +47,11 @@ class LettaCoreToolExecutor(ToolExecutor): "core_memory_replace": self.core_memory_replace, "memory_replace": self.memory_replace, "memory_insert": self.memory_insert, + "memory_str_replace": self.memory_str_replace, + "memory_str_insert": self.memory_str_insert, "memory_rethink": self.memory_rethink, "memory_finish_edits": self.memory_finish_edits, + "memory": self.memory, } if function_name not in function_map: @@ -479,8 +483,14 @@ class LettaCoreToolExecutor(ToolExecutor): "are for display purposes only)." ) - if agent_state.memory.get_block(label) is None: - agent_state.memory.create_block(label=label, value=new_memory) + try: + agent_state.memory.get_block(label) + except KeyError: + # Block doesn't exist, create it + from letta.schemas.block import Block + + new_block = Block(label=label, value=new_memory) + agent_state.memory.set_block(new_block) agent_state.memory.update_block_value(label=label, value=new_memory) @@ -502,3 +512,388 @@ class LettaCoreToolExecutor(ToolExecutor): async def memory_finish_edits(self, agent_state: AgentState, actor: User) -> None: return None + + async def memory_delete(self, agent_state: AgentState, actor: User, path: str) -> str: + """Delete a memory block by detaching it from the agent.""" + # Extract memory block label from path + label = path.removeprefix("/memories/").replace("/", "_") + + try: + # Check if memory block exists + memory_block = agent_state.memory.get_block(label) + if memory_block is None: + raise ValueError(f"Error: Memory block '{label}' does not exist") + + # Detach the block from the agent + updated_agent_state = await self.agent_manager.detach_block_async( + agent_id=agent_state.id, block_id=memory_block.id, actor=actor + ) + + # Update the agent state with the updated memory from the database + agent_state.memory = updated_agent_state.memory + + return f"Successfully deleted memory block '{label}'" + + except Exception as e: + return f"Error performing delete: {str(e)}" + + async def memory_update_description(self, agent_state: AgentState, actor: User, path: str, description: str) -> str: + """Update the description of a memory block.""" + label = path.removeprefix("/memories/").replace("/", "_") + + try: + # Check if old memory block exists + memory_block = agent_state.memory.get_block(label) + if memory_block is None: + raise ValueError(f"Error: Memory block '{label}' does not exist") + + await self.block_manager.update_block_async( + block_id=memory_block.id, block_update=BlockUpdate(description=description), actor=actor + ) + await self.agent_manager.rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True) + + return f"Successfully updated description of memory block '{label}'" + + except Exception as e: + raise Exception(f"Error performing update_description: {str(e)}") + + async def memory_rename(self, agent_state: AgentState, actor: User, old_path: str, new_path: str) -> str: + """Rename a memory block by copying content to new label and detaching old one.""" + # Extract memory block labels from paths + old_label = old_path.removeprefix("/memories/").replace("/", "_") + new_label = new_path.removeprefix("/memories/").replace("/", "_") + + try: + # Check if old memory block exists + memory_block = agent_state.memory.get_block(old_label) + if memory_block is None: + raise ValueError(f"Error: Memory block '{old_label}' does not exist") + + await self.block_manager.update_block_async(block_id=memory_block.id, block_update=BlockUpdate(label=new_label), actor=actor) + await self.agent_manager.rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True) + + return f"Successfully renamed memory block '{old_label}' to '{new_label}'" + + except Exception as e: + raise Exception(f"Error performing rename: {str(e)}") + + async def memory_view(self, agent_state: AgentState, actor: User, path: str, view_range: Optional[int] = None) -> str: + """View the content of a memory block with optional line range.""" + try: + # Special case: if path is "/memories", list all blocks + if path == "/memories": + blocks = agent_state.memory.get_blocks() + + if not blocks: + raise ValueError("No memory blocks found.") + + result_lines = [f"Found {len(blocks)} memory block(s):\n"] + + for i, block in enumerate(blocks, 1): + content = str(block.value) + content_length = len(content) + line_count = len(content.split("\n")) if content else 0 + + # Basic info + block_info = [f"{i}. {block.label}"] + + # Add description if available + if block.description: + block_info.append(f" Description: {block.description}") + + # Add read-only status + if block.read_only: + block_info.append(" Read-only: true") + + # Add content stats + block_info.append(f" Character limit: {block.limit}") + block_info.append(f" Current length: {content_length} characters") + block_info.append(f" Lines: {line_count}") + + # Add content preview (first 100 characters) + if content: + preview = content[:100].replace("\n", "\\n") + if len(content) > 100: + preview += "..." + block_info.append(f" Preview: {preview}") + else: + block_info.append(" Preview: (empty)") + + result_lines.append("\n".join(block_info)) + if i < len(blocks): # Add separator between blocks + result_lines.append("") + + return "\n".join(result_lines) + + # Extract memory block label from path (e.g., "/memories/preferences.txt" -> "preferences.txt") + if path.startswith("/memories/"): + label = path[10:] # Remove "/memories/" prefix + else: + label = path + + # Get the memory block + memory_block = agent_state.memory.get_block(label) + if memory_block is None: + raise ValueError(f"Error: Memory block '{label}' does not exist") + + # Get the content + content = str(memory_block.value) + if not content: + raise ValueError(f"Memory block '{label}' is empty") + + # Split content into lines + lines = content.split("\n") + total_lines = len(lines) + + # Handle view_range parameter + if view_range is not None: + if view_range <= 0: + raise ValueError(f"Error: view_range must be positive, got {view_range}") + + # Show only the first view_range lines + lines_to_show = lines[:view_range] + range_info = f" (showing first {view_range} of {total_lines} lines)" + else: + lines_to_show = lines + range_info = f" ({total_lines} lines total)" + + # Format output with line numbers + numbered_lines = [] + for i, line in enumerate(lines_to_show, start=1): + numbered_lines.append(f"Line {i}: {line}") + + numbered_content = "\n".join(numbered_lines) + + # Add metadata information + metadata_info = [] + if memory_block.description: + metadata_info.append(f"Description: {memory_block.description}") + if memory_block.read_only: + metadata_info.append("Read-only: true") + metadata_info.append(f"Character limit: {memory_block.limit}") + metadata_info.append(f"Current length: {len(content)} characters") + + metadata_str = "\n".join(metadata_info) + + result = f"Memory block: {label}{range_info}\n" + result += f"Metadata:\n{metadata_str}\n\n" + result += f"Content:\n{numbered_content}" + + return result + + except KeyError: + raise ValueError(f"Error: Memory block '{label}' does not exist") + except Exception as e: + raise Exception(f"Error viewing memory block: {str(e)}") + + async def memory_create( + self, agent_state: AgentState, actor: User, path: str, description: str, file_text: Optional[str] = None + ) -> str: + """Create a memory block by setting its value to an empty string.""" + from letta.schemas.block import Block + + label = path.removeprefix("/memories/").replace("/", "_") + + # Create a new block and persist it to the database + new_block = Block(label=label, value=file_text if file_text else "", description=description) + persisted_block = await self.block_manager.create_or_update_block_async(new_block, actor) + + # Attach the block to the agent + await self.agent_manager.attach_block_async(agent_id=agent_state.id, block_id=persisted_block.id, actor=actor) + + # Add the persisted block to memory + agent_state.memory.set_block(persisted_block) + + await self.agent_manager.update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + return f"Successfully created memory block '{label}'" + + async def memory_str_replace(self, agent_state: AgentState, actor: User, path: str, old_str: str, new_str: str) -> str: + """Replace text in a memory block.""" + label = path.removeprefix("/memories/").replace("/", "_") + + memory_block = agent_state.memory.get_block(label) + if memory_block is None: + raise ValueError(f"Error: Memory block '{label}' does not exist") + + if memory_block.read_only: + raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") + + if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(old_str)): + raise ValueError( + "old_str contains a line number prefix, which is not allowed. " + "Do not include line numbers when calling memory tools (line " + "numbers are for display purposes only)." + ) + if CORE_MEMORY_LINE_NUMBER_WARNING in old_str: + raise ValueError( + "old_str contains a line number warning, which is not allowed. " + "Do not include line number information when calling memory tools " + "(line numbers are for display purposes only)." + ) + if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(new_str)): + raise ValueError( + "new_str contains a line number prefix, which is not allowed. " + "Do not include line numbers when calling memory tools (line " + "numbers are for display purposes only)." + ) + + old_str = str(old_str).expandtabs() + new_str = str(new_str).expandtabs() + current_value = str(memory_block.value).expandtabs() + + # Check if old_str is unique in the block + occurences = current_value.count(old_str) + if occurences == 0: + raise ValueError( + f"No replacement was performed, old_str `{old_str}` did not appear verbatim in memory block with label `{label}`." + ) + elif occurences > 1: + content_value_lines = current_value.split("\n") + lines = [idx + 1 for idx, line in enumerate(content_value_lines) if old_str in line] + raise ValueError( + f"No replacement was performed. Multiple occurrences of old_str `{old_str}` in lines {lines}. Please ensure it is unique." + ) + + # Replace old_str with new_str + new_value = current_value.replace(str(old_str), str(new_str)) + + # Write the new content to the block + await self.block_manager.update_block_async(block_id=memory_block.id, block_update=BlockUpdate(value=new_value), actor=actor) + await self.agent_manager.rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True) + + # Prepare the success message + success_msg = f"The core memory block with label `{label}` has been edited. " + success_msg += ( + "Review the changes and make sure they are as expected (correct indentation, " + "no duplicate lines, etc). Edit the memory block again if necessary." + ) + + return success_msg + + async def memory_str_insert(self, agent_state: AgentState, actor: User, path: str, insert_text: str, insert_line: int = -1) -> str: + """Insert text into a memory block at a specific line.""" + label = path.removeprefix("/memories/").replace("/", "_") + + memory_block = agent_state.memory.get_block(label) + if memory_block is None: + raise ValueError(f"Error: Memory block '{label}' does not exist") + + if memory_block.read_only: + raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") + + if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(insert_text)): + raise ValueError( + "insert_text contains a line number prefix, which is not allowed. " + "Do not include line numbers when calling memory tools (line " + "numbers are for display purposes only)." + ) + if CORE_MEMORY_LINE_NUMBER_WARNING in insert_text: + raise ValueError( + "insert_text contains a line number warning, which is not allowed. " + "Do not include line number information when calling memory tools " + "(line numbers are for display purposes only)." + ) + + current_value = str(memory_block.value).expandtabs() + insert_text = str(insert_text).expandtabs() + current_value_lines = current_value.split("\n") + n_lines = len(current_value_lines) + + # Check if we're in range, from 0 (pre-line), to 1 (first line), to n_lines (last line) + if insert_line == -1: + insert_line = n_lines + elif insert_line < 0 or insert_line > n_lines: + raise ValueError( + f"Invalid `insert_line` parameter: {insert_line}. It should be within " + f"the range of lines of the memory block: {[0, n_lines]}, or -1 to " + f"append to the end of the memory block." + ) + + # Insert the new text as a line + SNIPPET_LINES = 3 + insert_text_lines = insert_text.split("\n") + new_value_lines = current_value_lines[:insert_line] + insert_text_lines + current_value_lines[insert_line:] + snippet_lines = ( + current_value_lines[max(0, insert_line - SNIPPET_LINES) : insert_line] + + insert_text_lines + + current_value_lines[insert_line : insert_line + SNIPPET_LINES] + ) + + # Collate into the new value to update + new_value = "\n".join(new_value_lines) + snippet = "\n".join(snippet_lines) + + # Write into the block + await self.block_manager.update_block_async(block_id=memory_block.id, block_update=BlockUpdate(value=new_value), actor=actor) + await self.agent_manager.rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True) + + # Prepare the success message + success_msg = f"The core memory block with label `{label}` has been edited. " + success_msg += ( + "Review the changes and make sure they are as expected (correct indentation, " + "no duplicate lines, etc). Edit the memory block again if necessary." + ) + + return success_msg + + async def memory( + self, + agent_state: AgentState, + actor: User, + command: str, + file_text: Optional[str] = None, + description: Optional[str] = None, + path: Optional[str] = None, + old_str: Optional[str] = None, + new_str: Optional[str] = None, + insert_line: Optional[int] = None, + insert_text: Optional[str] = None, + old_path: Optional[str] = None, + new_path: Optional[str] = None, + view_range: Optional[int] = None, + ) -> Optional[str]: + if command == "view": + if path is None: + raise ValueError("Error: path is required for view command") + return await self.memory_view(agent_state, actor, path, view_range) + + elif command == "create": + if path is None: + raise ValueError("Error: path is required for create command") + if description is None: + raise ValueError("Error: description is required for create command") + return await self.memory_create(agent_state, actor, path, description, file_text) + + elif command == "str_replace": + if path is None: + raise ValueError("Error: path is required for str_replace command") + if old_str is None: + raise ValueError("Error: old_str is required for str_replace command") + if new_str is None: + raise ValueError("Error: new_str is required for str_replace command") + return await self.memory_str_replace(agent_state, actor, path, old_str, new_str) + + elif command == "insert": + if path is None: + raise ValueError("Error: path is required for insert command") + if insert_text is None: + raise ValueError("Error: insert_text is required for insert command") + return await self.memory_str_insert(agent_state, actor, path, insert_text, insert_line) + + elif command == "delete": + if path is None: + raise ValueError("Error: path is required for delete command") + return await self.memory_delete(agent_state, actor, path) + + elif command == "rename": + if path and description: + return await self.memory_update_description(agent_state, actor, path, description) + elif old_path and new_path: + return await self.memory_rename(agent_state, actor, old_path, new_path) + else: + raise ValueError( + "Error: path and description are required for update_description command, or old_path and new_path are required for rename command" + ) + + else: + raise ValueError(f"Error: Unknown command '{command}'. Supported commands: str_replace, str_insert, insert, delete, rename")