feat: anthropic tools for claude sonnet 4.5 (#4988)
* add anthropic memory tools * memory view working * update memory examples * tools * feat: some changes (#5003) * feat: added the ability to modify and add descriptions on creation * fix: kill dead code & write into core_tool_executor instead * fix: use block_manager not agent_manager where possible, also turn the return string errors into raising exceptions * fix: cleanup, get rid of more return string errors replaced with valueerror, also drop deadcode --------- Co-authored-by: Charles Packer <packercharles@gmail.com>
This commit is contained in:
@@ -89,7 +89,7 @@ SEND_MESSAGE_TOOL_NAME = "send_message"
|
||||
BASE_TOOLS = [SEND_MESSAGE_TOOL_NAME, "conversation_search", "archival_memory_insert", "archival_memory_search"]
|
||||
DEPRECATED_LETTA_TOOLS = ["archival_memory_insert", "archival_memory_search"]
|
||||
# Base memory tools CAN be edited, and are added by default by the server
|
||||
BASE_MEMORY_TOOLS = ["core_memory_append", "core_memory_replace"]
|
||||
BASE_MEMORY_TOOLS = ["core_memory_append", "core_memory_replace", "memory"]
|
||||
# New v2 collection of the base memory tools (effecitvely same as sleeptime set), to pair with memgpt_v2 prompt
|
||||
BASE_MEMORY_TOOLS_V2 = [
|
||||
"memory_replace",
|
||||
@@ -98,6 +98,11 @@ BASE_MEMORY_TOOLS_V2 = [
|
||||
# "memory_rethink",
|
||||
# "memory_finish_edits",
|
||||
]
|
||||
|
||||
# v3 collection, currently just a omni memory tool for anthropic
|
||||
BASE_MEMORY_TOOLS_V3 = [
|
||||
"memory",
|
||||
]
|
||||
# Base tools if the memgpt agent has enable_sleeptime on
|
||||
BASE_SLEEPTIME_CHAT_TOOLS = [SEND_MESSAGE_TOOL_NAME, "conversation_search", "archival_memory_search"]
|
||||
# Base memory tools for sleeptime agent
|
||||
@@ -118,6 +123,7 @@ BASE_VOICE_SLEEPTIME_TOOLS = [
|
||||
"rethink_user_memory",
|
||||
"finish_rethinking_memory",
|
||||
]
|
||||
|
||||
# Multi agent tools
|
||||
MULTI_AGENT_TOOLS = ["send_message_to_agent_and_wait_for_reply", "send_message_to_agents_matching_tags", "send_message_to_agent_async"]
|
||||
LOCAL_ONLY_MULTI_AGENT_TOOLS = ["send_message_to_agent_async"]
|
||||
|
||||
@@ -1,7 +1,82 @@
|
||||
from typing import List, Literal, Optional
|
||||
from typing import TYPE_CHECKING, Any, List, Literal, Optional
|
||||
|
||||
from letta.constants import CORE_MEMORY_LINE_NUMBER_WARNING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from letta.schemas.agent import AgentState
|
||||
|
||||
|
||||
def memory(
|
||||
agent_state: "AgentState",
|
||||
command: str,
|
||||
path: Optional[str] = None,
|
||||
file_text: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
old_str: Optional[str] = None,
|
||||
new_str: Optional[str] = None,
|
||||
insert_line: Optional[int] = None,
|
||||
insert_text: Optional[str] = None,
|
||||
old_path: Optional[str] = None,
|
||||
new_path: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Memory management tool with various sub-commands for memory block operations.
|
||||
|
||||
Args:
|
||||
command (str): The sub-command to execute. Supported commands:
|
||||
- "view": List memory blocks or view specific block content
|
||||
- "create": Create a new memory block
|
||||
- "str_replace": Replace text in a memory block
|
||||
- "insert": Insert text at a specific line in a memory block
|
||||
- "delete": Delete a memory block
|
||||
- "rename": Rename a memory block
|
||||
path (Optional[str]): Path to the memory block (for str_replace, insert, delete)
|
||||
file_text (Optional[str]): The value to set in the memory block (for create)
|
||||
description (Optional[str]): The description to set in the memory block (for create, rename)
|
||||
old_str (Optional[str]): Old text to replace (for str_replace)
|
||||
new_str (Optional[str]): New text to replace with (for str_replace)
|
||||
insert_line (Optional[int]): Line number to insert at (for insert)
|
||||
insert_text (Optional[str]): Text to insert (for insert)
|
||||
old_path (Optional[str]): Old path for rename operation
|
||||
new_path (Optional[str]): New path for rename operation
|
||||
view_range (Optional[int]): Range of lines to view (for view)
|
||||
|
||||
Returns:
|
||||
Optional[str]: Success message or error description
|
||||
|
||||
Examples:
|
||||
# List all memory blocks
|
||||
memory(agent_state, "view", path="/memories")
|
||||
|
||||
# View specific memory block content
|
||||
memory(agent_state, "view", path="/memories/user_preferences")
|
||||
|
||||
# View first 10 lines of a memory block
|
||||
memory(agent_state, "view", path="/memories/user_preferences", view_range=10)
|
||||
|
||||
# Replace text in a memory block
|
||||
memory(agent_state, "str_replace", path="/memories/user_preferences", old_str="theme: dark", new_str="theme: light")
|
||||
|
||||
# Insert text at line 5
|
||||
memory(agent_state, "insert", path="/memories/notes", insert_line=5, insert_text="New note here")
|
||||
|
||||
# Delete a memory block
|
||||
memory(agent_state, "delete", path="/memories/old_notes")
|
||||
|
||||
# Rename a memory block
|
||||
memory(agent_state, "rename", old_path="/memories/temp", new_path="/memories/permanent")
|
||||
|
||||
# Update the description of a memory block
|
||||
memory(agent_state, "rename", path="/memories/temp", description="The user's temporary notes.")
|
||||
|
||||
# Create a memory block with starting text
|
||||
memory(agent_state, "create", path="/memories/coding_preferences", "description": "The user's coding preferences.", "file_text": "The user seems to add type hints to all of their Python code.")
|
||||
|
||||
# Create an empty memory block
|
||||
memory(agent_state, "create", path="/memories/coding_preferences", "description": "The user's coding preferences.")
|
||||
"""
|
||||
raise NotImplementedError("This should never be invoked directly. Contact Letta if you see this error message.")
|
||||
|
||||
|
||||
def send_message(self: "Agent", message: str) -> Optional[str]:
|
||||
"""
|
||||
@@ -201,7 +276,10 @@ def rethink_memory(agent_state: "AgentState", new_memory: str, target_block_labe
|
||||
"""
|
||||
|
||||
if agent_state.memory.get_block(target_block_label) is None:
|
||||
agent_state.memory.create_block(label=target_block_label, value=new_memory)
|
||||
from letta.schemas.block import Block
|
||||
|
||||
new_block = Block(label=target_block_label, value=new_memory)
|
||||
agent_state.memory.set_block(new_block)
|
||||
|
||||
agent_state.memory.update_block_value(label=target_block_label, value=new_memory)
|
||||
return None
|
||||
@@ -394,7 +472,10 @@ def memory_rethink(agent_state: "AgentState", label: str, new_memory: str) -> No
|
||||
)
|
||||
|
||||
if agent_state.memory.get_block(label) is None:
|
||||
agent_state.memory.create_block(label=label, value=new_memory)
|
||||
from letta.schemas.block import Block
|
||||
|
||||
new_block = Block(label=label, value=new_memory)
|
||||
agent_state.memory.set_block(new_block)
|
||||
|
||||
agent_state.memory.update_block_value(label=label, value=new_memory)
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from letta.constants import (
|
||||
BASE_MEMORY_TOOLS,
|
||||
BASE_MEMORY_TOOLS_V2,
|
||||
BASE_MEMORY_TOOLS_V3,
|
||||
BASE_SLEEPTIME_CHAT_TOOLS,
|
||||
BASE_SLEEPTIME_TOOLS,
|
||||
BASE_TOOLS,
|
||||
@@ -439,7 +440,7 @@ class AgentManager:
|
||||
for tn in tool_names:
|
||||
if tn in {"send_message", "send_message_to_agent_async", "memory_finish_edits"}:
|
||||
tool_rules.append(TerminalToolRule(tool_name=tn))
|
||||
elif tn in (BASE_TOOLS + BASE_MEMORY_TOOLS + BASE_MEMORY_TOOLS_V2 + BASE_SLEEPTIME_TOOLS):
|
||||
elif tn in (BASE_TOOLS + BASE_MEMORY_TOOLS + BASE_MEMORY_TOOLS_V2 + BASE_MEMORY_TOOLS_V3 + BASE_SLEEPTIME_TOOLS):
|
||||
tool_rules.append(ContinueToolRule(tool_name=tn))
|
||||
|
||||
for tool_with_requires_approval in requires_approval:
|
||||
@@ -1365,19 +1366,22 @@ class AgentManager:
|
||||
)
|
||||
if new_memory_str not in system_message.content[0].text:
|
||||
# update the blocks (LRW) in the DB
|
||||
for label in agent_state.memory.list_block_labels():
|
||||
updated_value = new_memory.get_block(label).value
|
||||
if updated_value != agent_state.memory.get_block(label).value:
|
||||
# update the block if it's changed
|
||||
block_id = agent_state.memory.get_block(label).id
|
||||
await self.block_manager.update_block_async(
|
||||
block_id=block_id, block_update=BlockUpdate(value=updated_value), actor=actor
|
||||
)
|
||||
for label in new_memory.list_block_labels():
|
||||
if label in agent_state.memory.list_block_labels():
|
||||
# Block exists in both old and new memory - check if value changed
|
||||
updated_value = new_memory.get_block(label).value
|
||||
if updated_value != agent_state.memory.get_block(label).value:
|
||||
# update the block if it's changed
|
||||
block_id = agent_state.memory.get_block(label).id
|
||||
await self.block_manager.update_block_async(
|
||||
block_id=block_id, block_update=BlockUpdate(value=updated_value), actor=actor
|
||||
)
|
||||
|
||||
# refresh memory from DB (using block ids)
|
||||
blocks = await self.block_manager.get_all_blocks_by_ids_async(
|
||||
block_ids=[b.id for b in agent_state.memory.get_blocks()], actor=actor
|
||||
)
|
||||
# Note: New blocks are already persisted in the creation methods,
|
||||
# so we don't need to handle them here
|
||||
|
||||
# refresh memory from DB (using block ids from the new memory)
|
||||
blocks = await self.block_manager.get_all_blocks_by_ids_async(block_ids=[b.id for b in new_memory.get_blocks()], actor=actor)
|
||||
|
||||
agent_state.memory = Memory(
|
||||
blocks=blocks,
|
||||
|
||||
@@ -11,6 +11,7 @@ from letta.constants import (
|
||||
from letta.helpers.json_helpers import json_dumps
|
||||
from letta.log import get_logger
|
||||
from letta.schemas.agent import AgentState
|
||||
from letta.schemas.block import BlockUpdate
|
||||
from letta.schemas.enums import MessageRole, TagMatchMode
|
||||
from letta.schemas.sandbox_config import SandboxConfig
|
||||
from letta.schemas.tool import Tool
|
||||
@@ -46,8 +47,11 @@ class LettaCoreToolExecutor(ToolExecutor):
|
||||
"core_memory_replace": self.core_memory_replace,
|
||||
"memory_replace": self.memory_replace,
|
||||
"memory_insert": self.memory_insert,
|
||||
"memory_str_replace": self.memory_str_replace,
|
||||
"memory_str_insert": self.memory_str_insert,
|
||||
"memory_rethink": self.memory_rethink,
|
||||
"memory_finish_edits": self.memory_finish_edits,
|
||||
"memory": self.memory,
|
||||
}
|
||||
|
||||
if function_name not in function_map:
|
||||
@@ -479,8 +483,14 @@ class LettaCoreToolExecutor(ToolExecutor):
|
||||
"are for display purposes only)."
|
||||
)
|
||||
|
||||
if agent_state.memory.get_block(label) is None:
|
||||
agent_state.memory.create_block(label=label, value=new_memory)
|
||||
try:
|
||||
agent_state.memory.get_block(label)
|
||||
except KeyError:
|
||||
# Block doesn't exist, create it
|
||||
from letta.schemas.block import Block
|
||||
|
||||
new_block = Block(label=label, value=new_memory)
|
||||
agent_state.memory.set_block(new_block)
|
||||
|
||||
agent_state.memory.update_block_value(label=label, value=new_memory)
|
||||
|
||||
@@ -502,3 +512,388 @@ class LettaCoreToolExecutor(ToolExecutor):
|
||||
|
||||
async def memory_finish_edits(self, agent_state: AgentState, actor: User) -> None:
|
||||
return None
|
||||
|
||||
async def memory_delete(self, agent_state: AgentState, actor: User, path: str) -> str:
|
||||
"""Delete a memory block by detaching it from the agent."""
|
||||
# Extract memory block label from path
|
||||
label = path.removeprefix("/memories/").replace("/", "_")
|
||||
|
||||
try:
|
||||
# Check if memory block exists
|
||||
memory_block = agent_state.memory.get_block(label)
|
||||
if memory_block is None:
|
||||
raise ValueError(f"Error: Memory block '{label}' does not exist")
|
||||
|
||||
# Detach the block from the agent
|
||||
updated_agent_state = await self.agent_manager.detach_block_async(
|
||||
agent_id=agent_state.id, block_id=memory_block.id, actor=actor
|
||||
)
|
||||
|
||||
# Update the agent state with the updated memory from the database
|
||||
agent_state.memory = updated_agent_state.memory
|
||||
|
||||
return f"Successfully deleted memory block '{label}'"
|
||||
|
||||
except Exception as e:
|
||||
return f"Error performing delete: {str(e)}"
|
||||
|
||||
async def memory_update_description(self, agent_state: AgentState, actor: User, path: str, description: str) -> str:
|
||||
"""Update the description of a memory block."""
|
||||
label = path.removeprefix("/memories/").replace("/", "_")
|
||||
|
||||
try:
|
||||
# Check if old memory block exists
|
||||
memory_block = agent_state.memory.get_block(label)
|
||||
if memory_block is None:
|
||||
raise ValueError(f"Error: Memory block '{label}' does not exist")
|
||||
|
||||
await self.block_manager.update_block_async(
|
||||
block_id=memory_block.id, block_update=BlockUpdate(description=description), actor=actor
|
||||
)
|
||||
await self.agent_manager.rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True)
|
||||
|
||||
return f"Successfully updated description of memory block '{label}'"
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Error performing update_description: {str(e)}")
|
||||
|
||||
async def memory_rename(self, agent_state: AgentState, actor: User, old_path: str, new_path: str) -> str:
|
||||
"""Rename a memory block by copying content to new label and detaching old one."""
|
||||
# Extract memory block labels from paths
|
||||
old_label = old_path.removeprefix("/memories/").replace("/", "_")
|
||||
new_label = new_path.removeprefix("/memories/").replace("/", "_")
|
||||
|
||||
try:
|
||||
# Check if old memory block exists
|
||||
memory_block = agent_state.memory.get_block(old_label)
|
||||
if memory_block is None:
|
||||
raise ValueError(f"Error: Memory block '{old_label}' does not exist")
|
||||
|
||||
await self.block_manager.update_block_async(block_id=memory_block.id, block_update=BlockUpdate(label=new_label), actor=actor)
|
||||
await self.agent_manager.rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True)
|
||||
|
||||
return f"Successfully renamed memory block '{old_label}' to '{new_label}'"
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"Error performing rename: {str(e)}")
|
||||
|
||||
async def memory_view(self, agent_state: AgentState, actor: User, path: str, view_range: Optional[int] = None) -> str:
|
||||
"""View the content of a memory block with optional line range."""
|
||||
try:
|
||||
# Special case: if path is "/memories", list all blocks
|
||||
if path == "/memories":
|
||||
blocks = agent_state.memory.get_blocks()
|
||||
|
||||
if not blocks:
|
||||
raise ValueError("No memory blocks found.")
|
||||
|
||||
result_lines = [f"Found {len(blocks)} memory block(s):\n"]
|
||||
|
||||
for i, block in enumerate(blocks, 1):
|
||||
content = str(block.value)
|
||||
content_length = len(content)
|
||||
line_count = len(content.split("\n")) if content else 0
|
||||
|
||||
# Basic info
|
||||
block_info = [f"{i}. {block.label}"]
|
||||
|
||||
# Add description if available
|
||||
if block.description:
|
||||
block_info.append(f" Description: {block.description}")
|
||||
|
||||
# Add read-only status
|
||||
if block.read_only:
|
||||
block_info.append(" Read-only: true")
|
||||
|
||||
# Add content stats
|
||||
block_info.append(f" Character limit: {block.limit}")
|
||||
block_info.append(f" Current length: {content_length} characters")
|
||||
block_info.append(f" Lines: {line_count}")
|
||||
|
||||
# Add content preview (first 100 characters)
|
||||
if content:
|
||||
preview = content[:100].replace("\n", "\\n")
|
||||
if len(content) > 100:
|
||||
preview += "..."
|
||||
block_info.append(f" Preview: {preview}")
|
||||
else:
|
||||
block_info.append(" Preview: (empty)")
|
||||
|
||||
result_lines.append("\n".join(block_info))
|
||||
if i < len(blocks): # Add separator between blocks
|
||||
result_lines.append("")
|
||||
|
||||
return "\n".join(result_lines)
|
||||
|
||||
# Extract memory block label from path (e.g., "/memories/preferences.txt" -> "preferences.txt")
|
||||
if path.startswith("/memories/"):
|
||||
label = path[10:] # Remove "/memories/" prefix
|
||||
else:
|
||||
label = path
|
||||
|
||||
# Get the memory block
|
||||
memory_block = agent_state.memory.get_block(label)
|
||||
if memory_block is None:
|
||||
raise ValueError(f"Error: Memory block '{label}' does not exist")
|
||||
|
||||
# Get the content
|
||||
content = str(memory_block.value)
|
||||
if not content:
|
||||
raise ValueError(f"Memory block '{label}' is empty")
|
||||
|
||||
# Split content into lines
|
||||
lines = content.split("\n")
|
||||
total_lines = len(lines)
|
||||
|
||||
# Handle view_range parameter
|
||||
if view_range is not None:
|
||||
if view_range <= 0:
|
||||
raise ValueError(f"Error: view_range must be positive, got {view_range}")
|
||||
|
||||
# Show only the first view_range lines
|
||||
lines_to_show = lines[:view_range]
|
||||
range_info = f" (showing first {view_range} of {total_lines} lines)"
|
||||
else:
|
||||
lines_to_show = lines
|
||||
range_info = f" ({total_lines} lines total)"
|
||||
|
||||
# Format output with line numbers
|
||||
numbered_lines = []
|
||||
for i, line in enumerate(lines_to_show, start=1):
|
||||
numbered_lines.append(f"Line {i}: {line}")
|
||||
|
||||
numbered_content = "\n".join(numbered_lines)
|
||||
|
||||
# Add metadata information
|
||||
metadata_info = []
|
||||
if memory_block.description:
|
||||
metadata_info.append(f"Description: {memory_block.description}")
|
||||
if memory_block.read_only:
|
||||
metadata_info.append("Read-only: true")
|
||||
metadata_info.append(f"Character limit: {memory_block.limit}")
|
||||
metadata_info.append(f"Current length: {len(content)} characters")
|
||||
|
||||
metadata_str = "\n".join(metadata_info)
|
||||
|
||||
result = f"Memory block: {label}{range_info}\n"
|
||||
result += f"Metadata:\n{metadata_str}\n\n"
|
||||
result += f"Content:\n{numbered_content}"
|
||||
|
||||
return result
|
||||
|
||||
except KeyError:
|
||||
raise ValueError(f"Error: Memory block '{label}' does not exist")
|
||||
except Exception as e:
|
||||
raise Exception(f"Error viewing memory block: {str(e)}")
|
||||
|
||||
async def memory_create(
|
||||
self, agent_state: AgentState, actor: User, path: str, description: str, file_text: Optional[str] = None
|
||||
) -> str:
|
||||
"""Create a memory block by setting its value to an empty string."""
|
||||
from letta.schemas.block import Block
|
||||
|
||||
label = path.removeprefix("/memories/").replace("/", "_")
|
||||
|
||||
# Create a new block and persist it to the database
|
||||
new_block = Block(label=label, value=file_text if file_text else "", description=description)
|
||||
persisted_block = await self.block_manager.create_or_update_block_async(new_block, actor)
|
||||
|
||||
# Attach the block to the agent
|
||||
await self.agent_manager.attach_block_async(agent_id=agent_state.id, block_id=persisted_block.id, actor=actor)
|
||||
|
||||
# Add the persisted block to memory
|
||||
agent_state.memory.set_block(persisted_block)
|
||||
|
||||
await self.agent_manager.update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor)
|
||||
return f"Successfully created memory block '{label}'"
|
||||
|
||||
async def memory_str_replace(self, agent_state: AgentState, actor: User, path: str, old_str: str, new_str: str) -> str:
|
||||
"""Replace text in a memory block."""
|
||||
label = path.removeprefix("/memories/").replace("/", "_")
|
||||
|
||||
memory_block = agent_state.memory.get_block(label)
|
||||
if memory_block is None:
|
||||
raise ValueError(f"Error: Memory block '{label}' does not exist")
|
||||
|
||||
if memory_block.read_only:
|
||||
raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}")
|
||||
|
||||
if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(old_str)):
|
||||
raise ValueError(
|
||||
"old_str contains a line number prefix, which is not allowed. "
|
||||
"Do not include line numbers when calling memory tools (line "
|
||||
"numbers are for display purposes only)."
|
||||
)
|
||||
if CORE_MEMORY_LINE_NUMBER_WARNING in old_str:
|
||||
raise ValueError(
|
||||
"old_str contains a line number warning, which is not allowed. "
|
||||
"Do not include line number information when calling memory tools "
|
||||
"(line numbers are for display purposes only)."
|
||||
)
|
||||
if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(new_str)):
|
||||
raise ValueError(
|
||||
"new_str contains a line number prefix, which is not allowed. "
|
||||
"Do not include line numbers when calling memory tools (line "
|
||||
"numbers are for display purposes only)."
|
||||
)
|
||||
|
||||
old_str = str(old_str).expandtabs()
|
||||
new_str = str(new_str).expandtabs()
|
||||
current_value = str(memory_block.value).expandtabs()
|
||||
|
||||
# Check if old_str is unique in the block
|
||||
occurences = current_value.count(old_str)
|
||||
if occurences == 0:
|
||||
raise ValueError(
|
||||
f"No replacement was performed, old_str `{old_str}` did not appear verbatim in memory block with label `{label}`."
|
||||
)
|
||||
elif occurences > 1:
|
||||
content_value_lines = current_value.split("\n")
|
||||
lines = [idx + 1 for idx, line in enumerate(content_value_lines) if old_str in line]
|
||||
raise ValueError(
|
||||
f"No replacement was performed. Multiple occurrences of old_str `{old_str}` in lines {lines}. Please ensure it is unique."
|
||||
)
|
||||
|
||||
# Replace old_str with new_str
|
||||
new_value = current_value.replace(str(old_str), str(new_str))
|
||||
|
||||
# Write the new content to the block
|
||||
await self.block_manager.update_block_async(block_id=memory_block.id, block_update=BlockUpdate(value=new_value), actor=actor)
|
||||
await self.agent_manager.rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True)
|
||||
|
||||
# Prepare the success message
|
||||
success_msg = f"The core memory block with label `{label}` has been edited. "
|
||||
success_msg += (
|
||||
"Review the changes and make sure they are as expected (correct indentation, "
|
||||
"no duplicate lines, etc). Edit the memory block again if necessary."
|
||||
)
|
||||
|
||||
return success_msg
|
||||
|
||||
async def memory_str_insert(self, agent_state: AgentState, actor: User, path: str, insert_text: str, insert_line: int = -1) -> str:
|
||||
"""Insert text into a memory block at a specific line."""
|
||||
label = path.removeprefix("/memories/").replace("/", "_")
|
||||
|
||||
memory_block = agent_state.memory.get_block(label)
|
||||
if memory_block is None:
|
||||
raise ValueError(f"Error: Memory block '{label}' does not exist")
|
||||
|
||||
if memory_block.read_only:
|
||||
raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}")
|
||||
|
||||
if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(insert_text)):
|
||||
raise ValueError(
|
||||
"insert_text contains a line number prefix, which is not allowed. "
|
||||
"Do not include line numbers when calling memory tools (line "
|
||||
"numbers are for display purposes only)."
|
||||
)
|
||||
if CORE_MEMORY_LINE_NUMBER_WARNING in insert_text:
|
||||
raise ValueError(
|
||||
"insert_text contains a line number warning, which is not allowed. "
|
||||
"Do not include line number information when calling memory tools "
|
||||
"(line numbers are for display purposes only)."
|
||||
)
|
||||
|
||||
current_value = str(memory_block.value).expandtabs()
|
||||
insert_text = str(insert_text).expandtabs()
|
||||
current_value_lines = current_value.split("\n")
|
||||
n_lines = len(current_value_lines)
|
||||
|
||||
# Check if we're in range, from 0 (pre-line), to 1 (first line), to n_lines (last line)
|
||||
if insert_line == -1:
|
||||
insert_line = n_lines
|
||||
elif insert_line < 0 or insert_line > n_lines:
|
||||
raise ValueError(
|
||||
f"Invalid `insert_line` parameter: {insert_line}. It should be within "
|
||||
f"the range of lines of the memory block: {[0, n_lines]}, or -1 to "
|
||||
f"append to the end of the memory block."
|
||||
)
|
||||
|
||||
# Insert the new text as a line
|
||||
SNIPPET_LINES = 3
|
||||
insert_text_lines = insert_text.split("\n")
|
||||
new_value_lines = current_value_lines[:insert_line] + insert_text_lines + current_value_lines[insert_line:]
|
||||
snippet_lines = (
|
||||
current_value_lines[max(0, insert_line - SNIPPET_LINES) : insert_line]
|
||||
+ insert_text_lines
|
||||
+ current_value_lines[insert_line : insert_line + SNIPPET_LINES]
|
||||
)
|
||||
|
||||
# Collate into the new value to update
|
||||
new_value = "\n".join(new_value_lines)
|
||||
snippet = "\n".join(snippet_lines)
|
||||
|
||||
# Write into the block
|
||||
await self.block_manager.update_block_async(block_id=memory_block.id, block_update=BlockUpdate(value=new_value), actor=actor)
|
||||
await self.agent_manager.rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True)
|
||||
|
||||
# Prepare the success message
|
||||
success_msg = f"The core memory block with label `{label}` has been edited. "
|
||||
success_msg += (
|
||||
"Review the changes and make sure they are as expected (correct indentation, "
|
||||
"no duplicate lines, etc). Edit the memory block again if necessary."
|
||||
)
|
||||
|
||||
return success_msg
|
||||
|
||||
async def memory(
|
||||
self,
|
||||
agent_state: AgentState,
|
||||
actor: User,
|
||||
command: str,
|
||||
file_text: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
path: Optional[str] = None,
|
||||
old_str: Optional[str] = None,
|
||||
new_str: Optional[str] = None,
|
||||
insert_line: Optional[int] = None,
|
||||
insert_text: Optional[str] = None,
|
||||
old_path: Optional[str] = None,
|
||||
new_path: Optional[str] = None,
|
||||
view_range: Optional[int] = None,
|
||||
) -> Optional[str]:
|
||||
if command == "view":
|
||||
if path is None:
|
||||
raise ValueError("Error: path is required for view command")
|
||||
return await self.memory_view(agent_state, actor, path, view_range)
|
||||
|
||||
elif command == "create":
|
||||
if path is None:
|
||||
raise ValueError("Error: path is required for create command")
|
||||
if description is None:
|
||||
raise ValueError("Error: description is required for create command")
|
||||
return await self.memory_create(agent_state, actor, path, description, file_text)
|
||||
|
||||
elif command == "str_replace":
|
||||
if path is None:
|
||||
raise ValueError("Error: path is required for str_replace command")
|
||||
if old_str is None:
|
||||
raise ValueError("Error: old_str is required for str_replace command")
|
||||
if new_str is None:
|
||||
raise ValueError("Error: new_str is required for str_replace command")
|
||||
return await self.memory_str_replace(agent_state, actor, path, old_str, new_str)
|
||||
|
||||
elif command == "insert":
|
||||
if path is None:
|
||||
raise ValueError("Error: path is required for insert command")
|
||||
if insert_text is None:
|
||||
raise ValueError("Error: insert_text is required for insert command")
|
||||
return await self.memory_str_insert(agent_state, actor, path, insert_text, insert_line)
|
||||
|
||||
elif command == "delete":
|
||||
if path is None:
|
||||
raise ValueError("Error: path is required for delete command")
|
||||
return await self.memory_delete(agent_state, actor, path)
|
||||
|
||||
elif command == "rename":
|
||||
if path and description:
|
||||
return await self.memory_update_description(agent_state, actor, path, description)
|
||||
elif old_path and new_path:
|
||||
return await self.memory_rename(agent_state, actor, old_path, new_path)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Error: path and description are required for update_description command, or old_path and new_path are required for rename command"
|
||||
)
|
||||
|
||||
else:
|
||||
raise ValueError(f"Error: Unknown command '{command}'. Supported commands: str_replace, str_insert, insert, delete, rename")
|
||||
|
||||
Reference in New Issue
Block a user