diff --git a/letta/services/tool_executor/tool_execution_manager.py b/letta/services/tool_executor/tool_execution_manager.py index a7c82c9d..fcc96759 100644 --- a/letta/services/tool_executor/tool_execution_manager.py +++ b/letta/services/tool_executor/tool_execution_manager.py @@ -12,7 +12,6 @@ from letta.services.tool_executor.tool_executor import ( ExternalComposioToolExecutor, ExternalMCPToolExecutor, LettaCoreToolExecutor, - LettaMemoryToolExecutor, LettaMultiAgentToolExecutor, SandboxToolExecutor, ToolExecutor, @@ -26,8 +25,9 @@ class ToolExecutorFactory: _executor_map: Dict[ToolType, Type[ToolExecutor]] = { ToolType.LETTA_CORE: LettaCoreToolExecutor, + ToolType.LETTA_MEMORY_CORE: LettaCoreToolExecutor, + ToolType.LETTA_SLEEPTIME_CORE: LettaCoreToolExecutor, ToolType.LETTA_MULTI_AGENT_CORE: LettaMultiAgentToolExecutor, - ToolType.LETTA_MEMORY_CORE: LettaMemoryToolExecutor, ToolType.EXTERNAL_COMPOSIO: ExternalComposioToolExecutor, ToolType.EXTERNAL_MCP: ExternalMCPToolExecutor, } @@ -35,13 +35,8 @@ class ToolExecutorFactory: @classmethod def get_executor(cls, tool_type: ToolType) -> ToolExecutor: """Get the appropriate executor for the given tool type.""" - executor_class = cls._executor_map.get(tool_type) - - if executor_class: - return executor_class() - - # Default to sandbox executor for unknown types - return SandboxToolExecutor() + executor_class = cls._executor_map.get(tool_type, SandboxToolExecutor) + return executor_class() class ToolExecutionManager: diff --git a/letta/services/tool_executor/tool_executor.py b/letta/services/tool_executor/tool_executor.py index f68b700c..a51b5e3c 100644 --- a/letta/services/tool_executor/tool_executor.py +++ b/letta/services/tool_executor/tool_executor.py @@ -3,7 +3,7 @@ import traceback from abc import ABC, abstractmethod from typing import Any, Dict, Optional -from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY, RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE +from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY, CORE_MEMORY_LINE_NUMBER_WARNING, RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE from letta.functions.ast_parsers import coerce_dict_args_by_annotations, get_function_annotations_from_source from letta.functions.helpers import execute_composio_action, generate_composio_action_from_func_name from letta.helpers.composio_helpers import get_composio_api_key @@ -58,6 +58,12 @@ class LettaCoreToolExecutor(ToolExecutor): "conversation_search": self.conversation_search, "archival_memory_search": self.archival_memory_search, "archival_memory_insert": self.archival_memory_insert, + "core_memory_append": self.core_memory_append, + "core_memory_replace": self.core_memory_replace, + "memory_replace": self.memory_replace, + "memory_insert": self.memory_insert, + "memory_rethink": self.memory_rethink, + "memory_finish_edits": self.memory_finish_edits, } if function_name not in function_map: @@ -186,53 +192,7 @@ class LettaCoreToolExecutor(ToolExecutor): AgentManager().rebuild_system_prompt(agent_id=agent_state.id, actor=actor, force=True) return None - -class LettaMultiAgentToolExecutor(ToolExecutor): - """Executor for LETTA multi-agent core tools.""" - - # TODO: Implement - # def execute(self, function_name: str, function_args: dict, agent: "Agent", tool: Tool) -> ToolExecutionResult: - # callable_func = get_function_from_module(LETTA_MULTI_AGENT_TOOL_MODULE_NAME, function_name) - # function_args["self"] = agent # need to attach self to arg since it's dynamically linked - # function_response = callable_func(**function_args) - # return ToolExecutionResult(func_return=function_response) - - -class LettaMemoryToolExecutor(ToolExecutor): - """Executor for LETTA memory core tools with direct implementation.""" - - def execute( - self, - function_name: str, - function_args: dict, - agent_state: AgentState, - tool: Tool, - actor: User, - sandbox_config: Optional[SandboxConfig] = None, - sandbox_env_vars: Optional[Dict[str, Any]] = None, - ) -> ToolExecutionResult: - # Map function names to method calls - function_map = { - "core_memory_append": self.core_memory_append, - "core_memory_replace": self.core_memory_replace, - } - - if function_name not in function_map: - raise ValueError(f"Unknown function: {function_name}") - - # Execute the appropriate function with the copied state - function_args_copy = function_args.copy() # Make a copy to avoid modifying the original - function_response = function_map[function_name](agent_state, **function_args_copy) - - # Update memory if changed - AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) - - return ToolExecutionResult( - status="success", - func_return=function_response, - ) - - def core_memory_append(self, agent_state: "AgentState", label: str, content: str) -> Optional[str]: + def core_memory_append(self, agent_state: "AgentState", actor: User, label: str, content: str) -> Optional[str]: """ Append to the contents of core memory. @@ -246,9 +206,17 @@ class LettaMemoryToolExecutor(ToolExecutor): current_value = str(agent_state.memory.get_block(label).value) new_value = current_value + "\n" + str(content) agent_state.memory.update_block_value(label=label, value=new_value) + AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) return None - def core_memory_replace(self, agent_state: "AgentState", label: str, old_content: str, new_content: str) -> Optional[str]: + def core_memory_replace( + self, + agent_state: "AgentState", + actor: User, + label: str, + old_content: str, + new_content: str, + ) -> Optional[str]: """ Replace the contents of core memory. To delete memories, use an empty string for new_content. @@ -265,8 +233,253 @@ class LettaMemoryToolExecutor(ToolExecutor): raise ValueError(f"Old content '{old_content}' not found in memory block '{label}'") new_value = current_value.replace(str(old_content), str(new_content)) agent_state.memory.update_block_value(label=label, value=new_value) + AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) return None + def memory_replace( + agent_state: "AgentState", + actor: User, + label: str, + old_str: str, + new_str: Optional[str] = None, + ) -> str: + """ + The memory_replace command allows you to replace a specific string in a memory + block with a new string. This is used for making precise edits. + + Args: + label (str): Section of the memory to be edited, identified by its label. + old_str (str): The text to replace (must match exactly, including whitespace + and indentation). + new_str (Optional[str]): The new text to insert in place of the old text. + Omit this argument to delete the old_str. + + Returns: + str: The success message + """ + import re + + if bool(re.search(r"\nLine \d+: ", old_str)): + raise ValueError( + "old_str contains a line number prefix, which is not allowed. " + "Do not include line numbers when calling memory tools (line " + "numbers are for display purposes only)." + ) + if CORE_MEMORY_LINE_NUMBER_WARNING in old_str: + raise ValueError( + "old_str contains a line number warning, which is not allowed. " + "Do not include line number information when calling memory tools " + "(line numbers are for display purposes only)." + ) + if bool(re.search(r"\nLine \d+: ", new_str)): + raise ValueError( + "new_str contains a line number prefix, which is not allowed. " + "Do not include line numbers when calling memory tools (line " + "numbers are for display purposes only)." + ) + + old_str = str(old_str).expandtabs() + new_str = str(new_str).expandtabs() + current_value = str(agent_state.memory.get_block(label).value).expandtabs() + + # Check if old_str is unique in the block + occurences = current_value.count(old_str) + if occurences == 0: + raise ValueError( + f"No replacement was performed, old_str `{old_str}` did not appear " f"verbatim in memory block with label `{label}`." + ) + elif occurences > 1: + content_value_lines = current_value.split("\n") + lines = [idx + 1 for idx, line in enumerate(content_value_lines) if old_str in line] + raise ValueError( + f"No replacement was performed. Multiple occurrences of " + f"old_str `{old_str}` in lines {lines}. Please ensure it is unique." + ) + + # Replace old_str with new_str + new_value = current_value.replace(str(old_str), str(new_str)) + + # Write the new content to the block + agent_state.memory.update_block_value(label=label, value=new_value) + + AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + + # Create a snippet of the edited section + SNIPPET_LINES = 3 + replacement_line = current_value.split(old_str)[0].count("\n") + start_line = max(0, replacement_line - SNIPPET_LINES) + end_line = replacement_line + SNIPPET_LINES + new_str.count("\n") + snippet = "\n".join(new_value.split("\n")[start_line : end_line + 1]) + + # Prepare the success message + success_msg = f"The core memory block with label `{label}` has been edited. " + # success_msg += self._make_output( + # snippet, f"a snippet of {path}", start_line + 1 + # ) + # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" + success_msg += ( + "Review the changes and make sure they are as expected (correct indentation, " + "no duplicate lines, etc). Edit the memory block again if necessary." + ) + + # return None + return success_msg + + def memory_insert( + agent_state: "AgentState", + actor: User, + label: str, + new_str: str, + insert_line: int = -1, + ) -> str: + """ + The memory_insert command allows you to insert text at a specific location + in a memory block. + + Args: + label (str): Section of the memory to be edited, identified by its label. + new_str (str): The text to insert. + insert_line (int): The line number after which to insert the text (0 for + beginning of file). Defaults to -1 (end of the file). + + Returns: + str: The success message + """ + import re + + if bool(re.search(r"\nLine \d+: ", new_str)): + raise ValueError( + "new_str contains a line number prefix, which is not allowed. Do not " + "include line numbers when calling memory tools (line numbers are for " + "display purposes only)." + ) + if CORE_MEMORY_LINE_NUMBER_WARNING in new_str: + raise ValueError( + "new_str contains a line number warning, which is not allowed. Do not " + "include line number information when calling memory tools (line numbers " + "are for display purposes only)." + ) + + current_value = str(agent_state.memory.get_block(label).value).expandtabs() + new_str = str(new_str).expandtabs() + current_value_lines = current_value.split("\n") + n_lines = len(current_value_lines) + + # Check if we're in range, from 0 (pre-line), to 1 (first line), to n_lines (last line) + if insert_line < 0 or insert_line > n_lines: + raise ValueError( + f"Invalid `insert_line` parameter: {insert_line}. It should be within " + f"the range of lines of the memory block: {[0, n_lines]}, or -1 to " + f"append to the end of the memory block." + ) + + # Insert the new string as a line + SNIPPET_LINES = 3 + new_str_lines = new_str.split("\n") + new_value_lines = current_value_lines[:insert_line] + new_str_lines + current_value_lines[insert_line:] + snippet_lines = ( + current_value_lines[max(0, insert_line - SNIPPET_LINES) : insert_line] + + new_str_lines + + current_value_lines[insert_line : insert_line + SNIPPET_LINES] + ) + + # Collate into the new value to update + new_value = "\n".join(new_value_lines) + snippet = "\n".join(snippet_lines) + + # Write into the block + agent_state.memory.update_block_value(label=label, value=new_value) + + AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + + # Prepare the success message + success_msg = f"The core memory block with label `{label}` has been edited. " + # success_msg += self._make_output( + # snippet, + # "a snippet of the edited file", + # max(1, insert_line - SNIPPET_LINES + 1), + # ) + # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" + success_msg += ( + "Review the changes and make sure they are as expected (correct indentation, " + "no duplicate lines, etc). Edit the memory block again if necessary." + ) + + return success_msg + + def memory_rethink(agent_state: "AgentState", actor: User, label: str, new_memory: str) -> str: + """ + The memory_rethink command allows you to completely rewrite the contents of a + memory block. Use this tool to make large sweeping changes (e.g. when you want + to condense or reorganize the memory blocks), do NOT use this tool to make small + precise edits (e.g. add or remove a line, replace a specific string, etc). + + Args: + label (str): The memory block to be rewritten, identified by its label. + new_memory (str): The new memory contents with information integrated from + existing memory blocks and the conversation context. + + Returns: + str: The success message + """ + import re + + if bool(re.search(r"\nLine \d+: ", new_memory)): + raise ValueError( + "new_memory contains a line number prefix, which is not allowed. Do not " + "include line numbers when calling memory tools (line numbers are for " + "display purposes only)." + ) + if CORE_MEMORY_LINE_NUMBER_WARNING in new_memory: + raise ValueError( + "new_memory contains a line number warning, which is not allowed. Do not " + "include line number information when calling memory tools (line numbers " + "are for display purposes only)." + ) + + if agent_state.memory.get_block(label) is None: + agent_state.memory.create_block(label=label, value=new_memory) + + agent_state.memory.update_block_value(label=label, value=new_memory) + + AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + + # Prepare the success message + success_msg = f"The core memory block with label `{label}` has been edited. " + # success_msg += self._make_output( + # snippet, f"a snippet of {path}", start_line + 1 + # ) + # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" + success_msg += ( + "Review the changes and make sure they are as expected (correct indentation, " + "no duplicate lines, etc). Edit the memory block again if necessary." + ) + + # return None + return success_msg + + def memory_finish_edits(agent_state: "AgentState") -> None: + """ + Call the memory_finish_edits command when you are finished making edits + (integrating all new information) into the memory blocks. This function + is called when the agent is done rethinking the memory. + + Returns: + Optional[str]: None is always returned as this function does not produce a response. + """ + return None + + +class LettaMultiAgentToolExecutor(ToolExecutor): + """Executor for LETTA multi-agent core tools.""" + + # TODO: Implement + # def execute(self, function_name: str, function_args: dict, agent: "Agent", tool: Tool) -> ToolExecutionResult: + # callable_func = get_function_from_module(LETTA_MULTI_AGENT_TOOL_MODULE_NAME, function_name) + # function_args["self"] = agent # need to attach self to arg since it's dynamically linked + # function_response = callable_func(**function_args) + # return ToolExecutionResult(func_return=function_response) + class ExternalComposioToolExecutor(ToolExecutor): """Executor for external Composio tools."""