diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index 836f7af6..56d8121a 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -775,6 +775,7 @@ class LettaAgent(BaseAgent): ToolType.LETTA_SLEEPTIME_CORE, ToolType.LETTA_VOICE_SLEEPTIME_CORE, ToolType.LETTA_BUILTIN, + ToolType.LETTA_FILES_CORE, ToolType.EXTERNAL_COMPOSIO, ToolType.EXTERNAL_MCP, } diff --git a/letta/constants.py b/letta/constants.py index 51b4b766..3d2ea35a 100644 --- a/letta/constants.py +++ b/letta/constants.py @@ -21,6 +21,15 @@ LETTA_CORE_TOOL_MODULE_NAME = "letta.functions.function_sets.base" LETTA_MULTI_AGENT_TOOL_MODULE_NAME = "letta.functions.function_sets.multi_agent" LETTA_VOICE_TOOL_MODULE_NAME = "letta.functions.function_sets.voice" LETTA_BUILTIN_TOOL_MODULE_NAME = "letta.functions.function_sets.builtin" +LETTA_FILES_TOOL_MODULE_NAME = "letta.functions.function_sets.files" + +LETTA_TOOL_MODULE_NAMES = [ + LETTA_CORE_TOOL_MODULE_NAME, + LETTA_MULTI_AGENT_TOOL_MODULE_NAME, + LETTA_VOICE_TOOL_MODULE_NAME, + LETTA_BUILTIN_TOOL_MODULE_NAME, + LETTA_FILES_TOOL_MODULE_NAME, +] # String in the error message for when the context window is too large @@ -112,6 +121,9 @@ MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX = re.compile( # Built in tools BUILTIN_TOOLS = ["run_code", "web_search"] +# Built in tools +FILES_TOOLS = ["web_search", "run_code", "open_file", "close_file", "grep", "search_files"] + # Set of all built-in Letta tools LETTA_TOOL_SET = set( BASE_TOOLS @@ -121,6 +133,7 @@ LETTA_TOOL_SET = set( + BASE_VOICE_SLEEPTIME_TOOLS + BASE_VOICE_SLEEPTIME_CHAT_TOOLS + BUILTIN_TOOLS + + FILES_TOOLS ) @@ -294,6 +307,7 @@ CORE_MEMORY_SOURCE_CHAR_LIMIT: int = 5000 # Function return limits FUNCTION_RETURN_CHAR_LIMIT = 6000 # ~300 words BASE_FUNCTION_RETURN_CHAR_LIMIT = 1000000 # very high (we rely on implementation) +FILE_IS_TRUNCATED_WARNING = "# NOTE: This block is truncated, use functions to view the full content." MAX_PAUSE_HEARTBEATS = 360 # in min diff --git a/letta/functions/function_sets/files.py b/letta/functions/function_sets/files.py new file mode 100644 index 00000000..d1c59afe --- /dev/null +++ b/letta/functions/function_sets/files.py @@ -0,0 +1,58 @@ +from typing import TYPE_CHECKING, List, Optional, Tuple + +if TYPE_CHECKING: + from letta.schemas.agent import AgentState + from letta.schemas.file import FileMetadata + + +async def open_file(agent_state: "AgentState", file_name: str, view_range: Optional[Tuple[int, int]]) -> str: + """ + Open up a file in core memory. + + Args: + file_name (str): Name of the file to view. + view_range (Optional[Tuple[int, int]]): Optional tuple indicating range to view. + + Returns: + str: A status message + """ + raise NotImplementedError("Tool not implemented. Please contact the Letta team.") + + +async def close_file(agent_state: "AgentState", file_name: str) -> str: + """ + Close a file in core memory. + + Args: + file_name (str): Name of the file to close. + + Returns: + str: A status message + """ + raise NotImplementedError("Tool not implemented. Please contact the Letta team.") + + +async def grep(agent_state: "AgentState", pattern: str) -> str: + """ + Grep tool to search files across data sources with keywords. + + Args: + pattern (str): Keyword or regex pattern to search. + + Returns: + str: Matching lines or summary output. + """ + raise NotImplementedError("Tool not implemented. Please contact the Letta team.") + + +async def search_files(agent_state: "AgentState", query: str) -> List["FileMetadata"]: + """ + Get list of most relevant files across all data sources. + + Args: + query (str): The search query. + + Returns: + List[FileMetadata]: List of matching files. + """ + raise NotImplementedError("Tool not implemented. Please contact the Letta team.") diff --git a/letta/functions/schema_generator.py b/letta/functions/schema_generator.py index db8ae2a0..c846aa9d 100644 --- a/letta/functions/schema_generator.py +++ b/letta/functions/schema_generator.py @@ -1,6 +1,6 @@ import inspect import warnings -from typing import Any, Dict, List, Optional, Type, Union, get_args, get_origin +from typing import Any, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin from composio.client.collections import ActionParametersModel from docstring_parser import parse @@ -76,6 +76,23 @@ def type_to_json_schema_type(py_type) -> dict: if get_origin(py_type) is Literal: return {"type": "string", "enum": get_args(py_type)} + # Handle tuple types (specifically fixed-length like Tuple[int, int]) + if origin in (tuple, Tuple): + args = get_args(py_type) + if len(args) == 0: + raise ValueError("Tuple type must have at least one element") + + # Support only fixed-length tuples like Tuple[int, int], not variable-length like Tuple[int, ...] + if len(args) == 2 and args[1] is Ellipsis: + raise NotImplementedError("Variable-length tuples (e.g., Tuple[int, ...]) are not supported") + + return { + "type": "array", + "prefixItems": [type_to_json_schema_type(arg) for arg in args], + "minItems": len(args), + "maxItems": len(args), + } + # Handle object types if py_type == dict or origin in (dict, Dict): args = get_args(py_type) diff --git a/letta/llm_api/helpers.py b/letta/llm_api/helpers.py index ed497e09..c5ca5c92 100644 --- a/letta/llm_api/helpers.py +++ b/letta/llm_api/helpers.py @@ -63,11 +63,11 @@ def _convert_to_structured_output_helper(property: dict) -> dict: def convert_to_structured_output(openai_function: dict, allow_optional: bool = False) -> dict: - """Convert function call objects to structured output objects + """Convert function call objects to structured output objects. See: https://platform.openai.com/docs/guides/structured-outputs/supported-schemas """ - description = openai_function["description"] if "description" in openai_function else "" + description = openai_function.get("description", "") structured_output = { "name": openai_function["name"], @@ -81,54 +81,58 @@ def convert_to_structured_output(openai_function: dict, allow_optional: bool = F }, } - # This code needs to be able to handle nested properties - # For example, the param details may have "type" + "description", - # but if "type" is "object" we expected "properties", where each property has details - # and if "type" is "array" we expect "items": for param, details in openai_function["parameters"]["properties"].items(): param_type = details["type"] - description = details.get("description", "") + param_description = details.get("description", "") if param_type == "object": if "properties" not in details: - # Structured outputs requires the properties on dicts be specified ahead of time - raise ValueError(f"Property {param} of type object is missing properties") + raise ValueError(f"Property {param} of type object is missing 'properties'") structured_output["parameters"]["properties"][param] = { "type": "object", - "description": description, + "description": param_description, "properties": {k: _convert_to_structured_output_helper(v) for k, v in details["properties"].items()}, "additionalProperties": False, "required": list(details["properties"].keys()), } elif param_type == "array": - structured_output["parameters"]["properties"][param] = { - "type": "array", - "description": description, - "items": _convert_to_structured_output_helper(details["items"]), - } + items_schema = details.get("items") + prefix_items_schema = details.get("prefixItems") + + if prefix_items_schema: + # assume fixed-length tuple — safe fallback to use first type for items + fallback_item = prefix_items_schema[0] if isinstance(prefix_items_schema, list) else prefix_items_schema + structured_output["parameters"]["properties"][param] = { + "type": "array", + "description": param_description, + "prefixItems": [_convert_to_structured_output_helper(item) for item in prefix_items_schema], + "items": _convert_to_structured_output_helper(fallback_item), + "minItems": details.get("minItems", len(prefix_items_schema)), + "maxItems": details.get("maxItems", len(prefix_items_schema)), + } + elif items_schema: + structured_output["parameters"]["properties"][param] = { + "type": "array", + "description": param_description, + "items": _convert_to_structured_output_helper(items_schema), + } + else: + raise ValueError(f"Array param '{param}' is missing both 'items' and 'prefixItems'") else: - structured_output["parameters"]["properties"][param] = { - "type": param_type, # simple type - "description": description, + prop = { + "type": param_type, + "description": param_description, } - - if "enum" in details: - structured_output["parameters"]["properties"][param]["enum"] = details["enum"] + if "enum" in details: + prop["enum"] = details["enum"] + structured_output["parameters"]["properties"][param] = prop if not allow_optional: - # Add all properties to required list structured_output["parameters"]["required"] = list(structured_output["parameters"]["properties"].keys()) - else: - # See what parameters exist that aren't required - # Those are implied "optional" types - # For those types, turn each of them into a union type with "null" - # e.g. - # "type": "string" -> "type": ["string", "null"] - # TODO - raise NotImplementedError + raise NotImplementedError("Optional parameter handling is not implemented.") return structured_output diff --git a/letta/orm/enums.py b/letta/orm/enums.py index 926af3ea..d34ae687 100644 --- a/letta/orm/enums.py +++ b/letta/orm/enums.py @@ -9,6 +9,7 @@ class ToolType(str, Enum): LETTA_SLEEPTIME_CORE = "letta_sleeptime_core" LETTA_VOICE_SLEEPTIME_CORE = "letta_voice_sleeptime_core" LETTA_BUILTIN = "letta_builtin" + LETTA_FILES_CORE = "letta_files_core" EXTERNAL_COMPOSIO = "external_composio" EXTERNAL_LANGCHAIN = "external_langchain" # TODO is "external" the right name here? Since as of now, MCP is local / doesn't support remote? diff --git a/letta/orm/files_agents.py b/letta/orm/files_agents.py index 61131ce1..a03b8334 100644 --- a/letta/orm/files_agents.py +++ b/letta/orm/files_agents.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Optional from sqlalchemy import Boolean, DateTime, ForeignKey, Index, String, Text, UniqueConstraint, func from sqlalchemy.orm import Mapped, mapped_column, relationship +from letta.constants import CORE_MEMORY_SOURCE_CHAR_LIMIT, FILE_IS_TRUNCATED_WARNING from letta.orm.mixins import OrganizationMixin from letta.orm.sqlalchemy_base import SqlalchemyBase from letta.schemas.block import Block as PydanticBlock @@ -65,6 +66,13 @@ class FileAgent(SqlalchemyBase, OrganizationMixin): # TODO: This is temporary as we figure out if we want FileBlock as a first class citizen def to_pydantic_block(self) -> PydanticBlock: visible_content = self.visible_content if self.visible_content and self.is_open else "" + + # Truncate content and add warnings here when converting from FileAgent to Block + if len(visible_content) > CORE_MEMORY_SOURCE_CHAR_LIMIT: + truncated_warning = f"\n{FILE_IS_TRUNCATED_WARNING}" + visible_content = visible_content[: CORE_MEMORY_SOURCE_CHAR_LIMIT - len(truncated_warning)] + visible_content += truncated_warning + return PydanticBlock( organization_id=self.organization_id, value=visible_content, diff --git a/letta/schemas/file.py b/letta/schemas/file.py index 2e38b599..e7db7621 100644 --- a/letta/schemas/file.py +++ b/letta/schemas/file.py @@ -72,6 +72,7 @@ class FileAgent(FileAgentBase): ) agent_id: str = Field(..., description="Unique identifier of the agent.") file_id: str = Field(..., description="Unique identifier of the file.") + file_name: str = Field(..., description="Name of the file.") is_open: bool = Field(True, description="True if the agent currently has the file open.") visible_content: Optional[str] = Field( None, diff --git a/letta/schemas/tool.py b/letta/schemas/tool.py index d805f6ec..e7b23e86 100644 --- a/letta/schemas/tool.py +++ b/letta/schemas/tool.py @@ -7,6 +7,7 @@ from letta.constants import ( FUNCTION_RETURN_CHAR_LIMIT, LETTA_BUILTIN_TOOL_MODULE_NAME, LETTA_CORE_TOOL_MODULE_NAME, + LETTA_FILES_TOOL_MODULE_NAME, LETTA_MULTI_AGENT_TOOL_MODULE_NAME, LETTA_VOICE_TOOL_MODULE_NAME, MCP_TOOL_TAG_NAME_PREFIX, @@ -106,6 +107,9 @@ class Tool(BaseTool): elif self.tool_type in {ToolType.LETTA_BUILTIN}: # If it's letta voice tool, we generate the json_schema on the fly here self.json_schema = get_json_schema_from_module(module_name=LETTA_BUILTIN_TOOL_MODULE_NAME, function_name=self.name) + elif self.tool_type in {ToolType.LETTA_FILES_CORE}: + # If it's letta files tool, we generate the json_schema on the fly here + self.json_schema = get_json_schema_from_module(module_name=LETTA_FILES_TOOL_MODULE_NAME, function_name=self.name) elif self.tool_type in {ToolType.EXTERNAL_COMPOSIO}: # Composio schemas handled separately pass diff --git a/letta/server/server.py b/letta/server/server.py index 8e79096e..347a9735 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -21,7 +21,7 @@ import letta.system as system from letta.agent import Agent, save_agent from letta.agents.letta_agent import LettaAgent from letta.config import LettaConfig -from letta.constants import CORE_MEMORY_SOURCE_CHAR_LIMIT, LETTA_TOOL_EXECUTION_DIR +from letta.constants import LETTA_TOOL_EXECUTION_DIR from letta.data_sources.connectors import DataConnector, load_data from letta.errors import HandleNotFoundError from letta.functions.mcp_client.types import MCPServerType, MCPTool, SSEServerConfig, StdioServerConfig @@ -1372,9 +1372,8 @@ class SyncServer(Server): """ Internal method to create or update a file <-> agent association """ - truncated_text = text[:CORE_MEMORY_SOURCE_CHAR_LIMIT] await self.file_agent_manager.attach_file( - agent_id=agent_id, file_id=file_id, file_name=file_name, actor=actor, visible_content=truncated_text + agent_id=agent_id, file_id=file_id, file_name=file_name, actor=actor, visible_content=text ) async def _remove_file_from_agent(self, agent_id: str, file_id: str, actor: User) -> None: diff --git a/letta/services/files_agents_manager.py b/letta/services/files_agents_manager.py index 1d4d0071..5a03f108 100644 --- a/letta/services/files_agents_manager.py +++ b/letta/services/files_agents_manager.py @@ -74,7 +74,7 @@ class FileAgentManager: @enforce_types @trace_method - async def update_file_agent( + async def update_file_agent_by_id( self, *, agent_id: str, @@ -85,7 +85,33 @@ class FileAgentManager: ) -> PydanticFileAgent: """Patch an existing association row.""" async with db_registry.async_session() as session: - assoc = await self._get_association(session, agent_id, file_id, actor) + assoc = await self._get_association_by_file_id(session, agent_id, file_id, actor) + + if is_open is not None: + assoc.is_open = is_open + if visible_content is not None: + assoc.visible_content = visible_content + + # touch timestamp + assoc.last_accessed_at = datetime.now(timezone.utc) + + await assoc.update_async(session, actor=actor) + return assoc.to_pydantic() + + @enforce_types + @trace_method + async def update_file_agent_by_name( + self, + *, + agent_id: str, + file_name: str, + actor: PydanticUser, + is_open: Optional[bool] = None, + visible_content: Optional[str] = None, + ) -> PydanticFileAgent: + """Patch an existing association row.""" + async with db_registry.async_session() as session: + assoc = await self._get_association_by_file_name(session, agent_id, file_name, actor) if is_open is not None: assoc.is_open = is_open @@ -103,15 +129,25 @@ class FileAgentManager: async def detach_file(self, *, agent_id: str, file_id: str, actor: PydanticUser) -> None: """Hard-delete the association.""" async with db_registry.async_session() as session: - assoc = await self._get_association(session, agent_id, file_id, actor) + assoc = await self._get_association_by_file_id(session, agent_id, file_id, actor) await assoc.hard_delete_async(session, actor=actor) @enforce_types @trace_method - async def get_file_agent(self, *, agent_id: str, file_id: str, actor: PydanticUser) -> Optional[PydanticFileAgent]: + async def get_file_agent_by_id(self, *, agent_id: str, file_id: str, actor: PydanticUser) -> Optional[PydanticFileAgent]: async with db_registry.async_session() as session: try: - assoc = await self._get_association(session, agent_id, file_id, actor) + assoc = await self._get_association_by_file_id(session, agent_id, file_id, actor) + return assoc.to_pydantic() + except NoResultFound: + return None + + @enforce_types + @trace_method + async def get_file_agent_by_file_name(self, *, agent_id: str, file_name: str, actor: PydanticUser) -> Optional[PydanticFileAgent]: + async with db_registry.async_session() as session: + try: + assoc = await self._get_association_by_file_name(session, agent_id, file_name, actor) return assoc.to_pydantic() except NoResultFound: return None @@ -173,7 +209,7 @@ class FileAgentManager: await session.execute(stmt) await session.commit() - async def _get_association(self, session, agent_id: str, file_id: str, actor: PydanticUser) -> FileAgentModel: + async def _get_association_by_file_id(self, session, agent_id: str, file_id: str, actor: PydanticUser) -> FileAgentModel: q = select(FileAgentModel).where( and_( FileAgentModel.agent_id == agent_id, @@ -185,3 +221,16 @@ class FileAgentManager: if not assoc: raise NoResultFound(f"FileAgent(agent_id={agent_id}, file_id={file_id}) not found in org {actor.organization_id}") return assoc + + async def _get_association_by_file_name(self, session, agent_id: str, file_name: str, actor: PydanticUser) -> FileAgentModel: + q = select(FileAgentModel).where( + and_( + FileAgentModel.agent_id == agent_id, + FileAgentModel.file_name == file_name, + FileAgentModel.organization_id == actor.organization_id, + ) + ) + assoc = await session.scalar(q) + if not assoc: + raise NoResultFound(f"FileAgent(agent_id={agent_id}, file_name={file_name}) not found in org {actor.organization_id}") + return assoc diff --git a/letta/services/tool_executor/builtin_tool_executor.py b/letta/services/tool_executor/builtin_tool_executor.py new file mode 100644 index 00000000..e1993e19 --- /dev/null +++ b/letta/services/tool_executor/builtin_tool_executor.py @@ -0,0 +1,117 @@ +import json +from textwrap import shorten +from typing import Any, Dict, Literal, Optional + +from letta.constants import WEB_SEARCH_CLIP_CONTENT, WEB_SEARCH_INCLUDE_SCORE, WEB_SEARCH_SEPARATOR +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxConfig +from letta.schemas.tool import Tool +from letta.schemas.tool_execution_result import ToolExecutionResult +from letta.schemas.user import User +from letta.services.tool_executor.tool_executor_base import ToolExecutor +from letta.settings import tool_settings +from letta.tracing import trace_method + + +class LettaBuiltinToolExecutor(ToolExecutor): + """Executor for built in Letta tools.""" + + @trace_method + async def execute( + self, + function_name: str, + function_args: dict, + tool: Tool, + actor: User, + agent_state: Optional[AgentState] = None, + sandbox_config: Optional[SandboxConfig] = None, + sandbox_env_vars: Optional[Dict[str, Any]] = None, + ) -> ToolExecutionResult: + function_map = {"run_code": self.run_code, "web_search": self.web_search} + + if function_name not in function_map: + raise ValueError(f"Unknown function: {function_name}") + + # Execute the appropriate function + function_args_copy = function_args.copy() # Make a copy to avoid modifying the original + function_response = await function_map[function_name](**function_args_copy) + + return ToolExecutionResult( + status="success", + func_return=function_response, + agent_state=agent_state, + ) + + async def run_code(self, code: str, language: Literal["python", "js", "ts", "r", "java"]) -> str: + from e2b_code_interpreter import AsyncSandbox + + if tool_settings.e2b_api_key is None: + raise ValueError("E2B_API_KEY is not set") + + sbx = await AsyncSandbox.create(api_key=tool_settings.e2b_api_key) + params = {"code": code} + if language != "python": + # Leave empty for python + params["language"] = language + + res = self._llm_friendly_result(await sbx.run_code(**params)) + return json.dumps(res, ensure_ascii=False) + + def _llm_friendly_result(self, res): + out = { + "results": [r.text if hasattr(r, "text") else str(r) for r in res.results], + "logs": { + "stdout": getattr(res.logs, "stdout", []), + "stderr": getattr(res.logs, "stderr", []), + }, + } + err = getattr(res, "error", None) + if err is not None: + out["error"] = err + return out + + async def web_search(agent_state: "AgentState", query: str) -> str: + """ + Search the web for information. + Args: + query (str): The query to search the web for. + Returns: + str: The search results. + """ + + try: + from tavily import AsyncTavilyClient + except ImportError: + raise ImportError("tavily is not installed in the tool execution environment") + + # Check if the API key exists + if tool_settings.tavily_api_key is None: + raise ValueError("TAVILY_API_KEY is not set") + + # Instantiate client and search + tavily_client = AsyncTavilyClient(api_key=tool_settings.tavily_api_key) + search_results = await tavily_client.search(query=query, auto_parameters=True) + + results = search_results.get("results", []) + if not results: + return "No search results found." + + # ---- format for the LLM ------------------------------------------------- + formatted_blocks = [] + for idx, item in enumerate(results, start=1): + title = item.get("title") or "Untitled" + url = item.get("url") or "Unknown URL" + # keep each content snippet reasonably short so you don’t blow up context + content = ( + shorten(item.get("content", "").strip(), width=600, placeholder=" …") + if WEB_SEARCH_CLIP_CONTENT + else item.get("content", "").strip() + ) + score = item.get("score") + if WEB_SEARCH_INCLUDE_SCORE: + block = f"\nRESULT {idx}:\n" f"Title: {title}\n" f"URL: {url}\n" f"Relevance score: {score:.4f}\n" f"Content: {content}\n" + else: + block = f"\nRESULT {idx}:\n" f"Title: {title}\n" f"URL: {url}\n" f"Content: {content}\n" + formatted_blocks.append(block) + + return WEB_SEARCH_SEPARATOR.join(formatted_blocks) diff --git a/letta/services/tool_executor/composio_tool_executor.py b/letta/services/tool_executor/composio_tool_executor.py new file mode 100644 index 00000000..4f4dddb8 --- /dev/null +++ b/letta/services/tool_executor/composio_tool_executor.py @@ -0,0 +1,53 @@ +from typing import Any, Dict, Optional + +from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY +from letta.functions.composio_helpers import execute_composio_action_async, generate_composio_action_from_func_name +from letta.helpers.composio_helpers import get_composio_api_key_async +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxConfig +from letta.schemas.tool import Tool +from letta.schemas.tool_execution_result import ToolExecutionResult +from letta.schemas.user import User +from letta.services.tool_executor.tool_executor_base import ToolExecutor +from letta.tracing import trace_method + + +class ExternalComposioToolExecutor(ToolExecutor): + """Executor for external Composio tools.""" + + @trace_method + async def execute( + self, + function_name: str, + function_args: dict, + tool: Tool, + actor: User, + agent_state: Optional[AgentState] = None, + sandbox_config: Optional[SandboxConfig] = None, + sandbox_env_vars: Optional[Dict[str, Any]] = None, + ) -> ToolExecutionResult: + assert agent_state is not None, "Agent state is required for external Composio tools" + action_name = generate_composio_action_from_func_name(tool.name) + + # Get entity ID from the agent_state + entity_id = self._get_entity_id(agent_state) + + # Get composio_api_key + composio_api_key = await get_composio_api_key_async(actor=actor) + + # TODO (matt): Roll in execute_composio_action into this class + function_response = await execute_composio_action_async( + action_name=action_name, args=function_args, api_key=composio_api_key, entity_id=entity_id + ) + + return ToolExecutionResult( + status="success", + func_return=function_response, + ) + + def _get_entity_id(self, agent_state: AgentState) -> Optional[str]: + """Extract the entity ID from environment variables.""" + for env_var in agent_state.tool_exec_environment_variables: + if env_var.key == COMPOSIO_ENTITY_ENV_VAR_KEY: + return env_var.value + return None diff --git a/letta/services/tool_executor/core_tool_executor.py b/letta/services/tool_executor/core_tool_executor.py new file mode 100644 index 00000000..fddd4c6f --- /dev/null +++ b/letta/services/tool_executor/core_tool_executor.py @@ -0,0 +1,474 @@ +import math +from typing import Any, Dict, Optional + +from letta.constants import ( + CORE_MEMORY_LINE_NUMBER_WARNING, + MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX, + READ_ONLY_BLOCK_EDIT_ERROR, + RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE, +) +from letta.helpers.json_helpers import json_dumps +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxConfig +from letta.schemas.tool import Tool +from letta.schemas.tool_execution_result import ToolExecutionResult +from letta.schemas.user import User +from letta.services.agent_manager import AgentManager +from letta.services.message_manager import MessageManager +from letta.services.passage_manager import PassageManager +from letta.services.tool_executor.tool_executor_base import ToolExecutor +from letta.utils import get_friendly_error_msg + + +class LettaCoreToolExecutor(ToolExecutor): + """Executor for LETTA core tools with direct implementation of functions.""" + + async def execute( + self, + function_name: str, + function_args: dict, + tool: Tool, + actor: User, + agent_state: Optional[AgentState] = None, + sandbox_config: Optional[SandboxConfig] = None, + sandbox_env_vars: Optional[Dict[str, Any]] = None, + ) -> ToolExecutionResult: + # Map function names to method calls + assert agent_state is not None, "Agent state is required for core tools" + function_map = { + "send_message": self.send_message, + "conversation_search": self.conversation_search, + "archival_memory_search": self.archival_memory_search, + "archival_memory_insert": self.archival_memory_insert, + "core_memory_append": self.core_memory_append, + "core_memory_replace": self.core_memory_replace, + "memory_replace": self.memory_replace, + "memory_insert": self.memory_insert, + "memory_rethink": self.memory_rethink, + "memory_finish_edits": self.memory_finish_edits, + } + + if function_name not in function_map: + raise ValueError(f"Unknown function: {function_name}") + + # Execute the appropriate function + function_args_copy = function_args.copy() # Make a copy to avoid modifying the original + try: + function_response = await function_map[function_name](agent_state, actor, **function_args_copy) + return ToolExecutionResult( + status="success", + func_return=function_response, + agent_state=agent_state, + ) + except Exception as e: + return ToolExecutionResult( + status="error", + func_return=e, + agent_state=agent_state, + stderr=[get_friendly_error_msg(function_name=function_name, exception_name=type(e).__name__, exception_message=str(e))], + ) + + async def send_message(self, agent_state: AgentState, actor: User, message: str) -> Optional[str]: + """ + Sends a message to the human user. + + Args: + message (str): Message contents. All unicode (including emojis) are supported. + + Returns: + Optional[str]: None is always returned as this function does not produce a response. + """ + return "Sent message successfully." + + async def conversation_search(self, agent_state: AgentState, actor: User, query: str, page: Optional[int] = 0) -> Optional[str]: + """ + Search prior conversation history using case-insensitive string matching. + + Args: + query (str): String to search for. + page (int): Allows you to page through results. Only use on a follow-up query. Defaults to 0 (first page). + + Returns: + str: Query result string + """ + if page is None or (isinstance(page, str) and page.lower().strip() == "none"): + page = 0 + try: + page = int(page) + except: + raise ValueError(f"'page' argument must be an integer") + + count = RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE + messages = await MessageManager().list_user_messages_for_agent_async( + agent_id=agent_state.id, + actor=actor, + query_text=query, + limit=count, + ) + + total = len(messages) + num_pages = math.ceil(total / count) - 1 # 0 index + + if len(messages) == 0: + results_str = f"No results found." + else: + results_pref = f"Showing {len(messages)} of {total} results (page {page}/{num_pages}):" + results_formatted = [message.content[0].text for message in messages] + results_str = f"{results_pref} {json_dumps(results_formatted)}" + + return results_str + + async def archival_memory_search( + self, agent_state: AgentState, actor: User, query: str, page: Optional[int] = 0, start: Optional[int] = 0 + ) -> Optional[str]: + """ + Search archival memory using semantic (embedding-based) search. + + Args: + query (str): String to search for. + page (Optional[int]): Allows you to page through results. Only use on a follow-up query. Defaults to 0 (first page). + start (Optional[int]): Starting index for the search results. Defaults to 0. + + Returns: + str: Query result string + """ + if page is None or (isinstance(page, str) and page.lower().strip() == "none"): + page = 0 + try: + page = int(page) + except: + raise ValueError(f"'page' argument must be an integer") + + count = RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE + + try: + # Get results using passage manager + all_results = await AgentManager().list_passages_async( + actor=actor, + agent_id=agent_state.id, + query_text=query, + limit=count + start, # Request enough results to handle offset + embedding_config=agent_state.embedding_config, + embed_query=True, + ) + + # Apply pagination + end = min(count + start, len(all_results)) + paged_results = all_results[start:end] + + # Format results to match previous implementation + formatted_results = [{"timestamp": str(result.created_at), "content": result.text} for result in paged_results] + + return formatted_results, len(formatted_results) + + except Exception as e: + raise e + + async def archival_memory_insert(self, agent_state: AgentState, actor: User, content: str) -> Optional[str]: + """ + Add to archival memory. Make sure to phrase the memory contents such that it can be easily queried later. + + Args: + content (str): Content to write to the memory. All unicode (including emojis) are supported. + + Returns: + Optional[str]: None is always returned as this function does not produce a response. + """ + await PassageManager().insert_passage_async( + agent_state=agent_state, + agent_id=agent_state.id, + text=content, + actor=actor, + ) + await AgentManager().rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True) + return None + + async def core_memory_append(self, agent_state: AgentState, actor: User, label: str, content: str) -> Optional[str]: + """ + Append to the contents of core memory. + + Args: + label (str): Section of the memory to be edited (persona or human). + content (str): Content to write to the memory. All unicode (including emojis) are supported. + + Returns: + Optional[str]: None is always returned as this function does not produce a response. + """ + if agent_state.memory.get_block(label).read_only: + raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") + current_value = str(agent_state.memory.get_block(label).value) + new_value = current_value + "\n" + str(content) + agent_state.memory.update_block_value(label=label, value=new_value) + await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + return None + + async def core_memory_replace( + self, + agent_state: AgentState, + actor: User, + label: str, + old_content: str, + new_content: str, + ) -> Optional[str]: + """ + Replace the contents of core memory. To delete memories, use an empty string for new_content. + + Args: + label (str): Section of the memory to be edited (persona or human). + old_content (str): String to replace. Must be an exact match. + new_content (str): Content to write to the memory. All unicode (including emojis) are supported. + + Returns: + Optional[str]: None is always returned as this function does not produce a response. + """ + if agent_state.memory.get_block(label).read_only: + raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") + current_value = str(agent_state.memory.get_block(label).value) + if old_content not in current_value: + raise ValueError(f"Old content '{old_content}' not found in memory block '{label}'") + new_value = current_value.replace(str(old_content), str(new_content)) + agent_state.memory.update_block_value(label=label, value=new_value) + await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + return None + + async def memory_replace( + self, + agent_state: AgentState, + actor: User, + label: str, + old_str: str, + new_str: Optional[str] = None, + ) -> str: + """ + The memory_replace command allows you to replace a specific string in a memory + block with a new string. This is used for making precise edits. + + Args: + label (str): Section of the memory to be edited, identified by its label. + old_str (str): The text to replace (must match exactly, including whitespace + and indentation). Do not include line number prefixes. + new_str (Optional[str]): The new text to insert in place of the old text. + Omit this argument to delete the old_str. Do not include line number prefixes. + + Returns: + str: The success message + """ + + if agent_state.memory.get_block(label).read_only: + raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") + + if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(old_str)): + raise ValueError( + "old_str contains a line number prefix, which is not allowed. " + "Do not include line numbers when calling memory tools (line " + "numbers are for display purposes only)." + ) + if CORE_MEMORY_LINE_NUMBER_WARNING in old_str: + raise ValueError( + "old_str contains a line number warning, which is not allowed. " + "Do not include line number information when calling memory tools " + "(line numbers are for display purposes only)." + ) + if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(new_str)): + raise ValueError( + "new_str contains a line number prefix, which is not allowed. " + "Do not include line numbers when calling memory tools (line " + "numbers are for display purposes only)." + ) + + old_str = str(old_str).expandtabs() + new_str = str(new_str).expandtabs() + current_value = str(agent_state.memory.get_block(label).value).expandtabs() + + # Check if old_str is unique in the block + occurences = current_value.count(old_str) + if occurences == 0: + raise ValueError( + f"No replacement was performed, old_str `{old_str}` did not appear " f"verbatim in memory block with label `{label}`." + ) + elif occurences > 1: + content_value_lines = current_value.split("\n") + lines = [idx + 1 for idx, line in enumerate(content_value_lines) if old_str in line] + raise ValueError( + f"No replacement was performed. Multiple occurrences of " + f"old_str `{old_str}` in lines {lines}. Please ensure it is unique." + ) + + # Replace old_str with new_str + new_value = current_value.replace(str(old_str), str(new_str)) + + # Write the new content to the block + agent_state.memory.update_block_value(label=label, value=new_value) + + await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + + # Create a snippet of the edited section + SNIPPET_LINES = 3 + replacement_line = current_value.split(old_str)[0].count("\n") + start_line = max(0, replacement_line - SNIPPET_LINES) + end_line = replacement_line + SNIPPET_LINES + new_str.count("\n") + snippet = "\n".join(new_value.split("\n")[start_line : end_line + 1]) + + # Prepare the success message + success_msg = f"The core memory block with label `{label}` has been edited. " + # success_msg += self._make_output( + # snippet, f"a snippet of {path}", start_line + 1 + # ) + # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" + success_msg += ( + "Review the changes and make sure they are as expected (correct indentation, " + "no duplicate lines, etc). Edit the memory block again if necessary." + ) + + # return None + return success_msg + + async def memory_insert( + self, + agent_state: AgentState, + actor: User, + label: str, + new_str: str, + insert_line: int = -1, + ) -> str: + """ + The memory_insert command allows you to insert text at a specific location + in a memory block. + + Args: + label (str): Section of the memory to be edited, identified by its label. + new_str (str): The text to insert. Do not include line number prefixes. + insert_line (int): The line number after which to insert the text (0 for + beginning of file). Defaults to -1 (end of the file). + + Returns: + str: The success message + """ + + if agent_state.memory.get_block(label).read_only: + raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") + + if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(new_str)): + raise ValueError( + "new_str contains a line number prefix, which is not allowed. Do not " + "include line numbers when calling memory tools (line numbers are for " + "display purposes only)." + ) + if CORE_MEMORY_LINE_NUMBER_WARNING in new_str: + raise ValueError( + "new_str contains a line number warning, which is not allowed. Do not " + "include line number information when calling memory tools (line numbers " + "are for display purposes only)." + ) + + current_value = str(agent_state.memory.get_block(label).value).expandtabs() + new_str = str(new_str).expandtabs() + current_value_lines = current_value.split("\n") + n_lines = len(current_value_lines) + + # Check if we're in range, from 0 (pre-line), to 1 (first line), to n_lines (last line) + if insert_line == -1: + insert_line = n_lines + elif insert_line < 0 or insert_line > n_lines: + raise ValueError( + f"Invalid `insert_line` parameter: {insert_line}. It should be within " + f"the range of lines of the memory block: {[0, n_lines]}, or -1 to " + f"append to the end of the memory block." + ) + + # Insert the new string as a line + SNIPPET_LINES = 3 + new_str_lines = new_str.split("\n") + new_value_lines = current_value_lines[:insert_line] + new_str_lines + current_value_lines[insert_line:] + snippet_lines = ( + current_value_lines[max(0, insert_line - SNIPPET_LINES) : insert_line] + + new_str_lines + + current_value_lines[insert_line : insert_line + SNIPPET_LINES] + ) + + # Collate into the new value to update + new_value = "\n".join(new_value_lines) + snippet = "\n".join(snippet_lines) + + # Write into the block + agent_state.memory.update_block_value(label=label, value=new_value) + + await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + + # Prepare the success message + success_msg = f"The core memory block with label `{label}` has been edited. " + # success_msg += self._make_output( + # snippet, + # "a snippet of the edited file", + # max(1, insert_line - SNIPPET_LINES + 1), + # ) + # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" + success_msg += ( + "Review the changes and make sure they are as expected (correct indentation, " + "no duplicate lines, etc). Edit the memory block again if necessary." + ) + + return success_msg + + async def memory_rethink(self, agent_state: AgentState, actor: User, label: str, new_memory: str) -> str: + """ + The memory_rethink command allows you to completely rewrite the contents of a + memory block. Use this tool to make large sweeping changes (e.g. when you want + to condense or reorganize the memory blocks), do NOT use this tool to make small + precise edits (e.g. add or remove a line, replace a specific string, etc). + + Args: + label (str): The memory block to be rewritten, identified by its label. + new_memory (str): The new memory contents with information integrated from + existing memory blocks and the conversation context. Do not include line number prefixes. + + Returns: + str: The success message + """ + if agent_state.memory.get_block(label).read_only: + raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") + + if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(new_memory)): + raise ValueError( + "new_memory contains a line number prefix, which is not allowed. Do not " + "include line numbers when calling memory tools (line numbers are for " + "display purposes only)." + ) + if CORE_MEMORY_LINE_NUMBER_WARNING in new_memory: + raise ValueError( + "new_memory contains a line number warning, which is not allowed. Do not " + "include line number information when calling memory tools (line numbers " + "are for display purposes only)." + ) + + if agent_state.memory.get_block(label) is None: + agent_state.memory.create_block(label=label, value=new_memory) + + agent_state.memory.update_block_value(label=label, value=new_memory) + + await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) + + # Prepare the success message + success_msg = f"The core memory block with label `{label}` has been edited. " + # success_msg += self._make_output( + # snippet, f"a snippet of {path}", start_line + 1 + # ) + # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" + success_msg += ( + "Review the changes and make sure they are as expected (correct indentation, " + "no duplicate lines, etc). Edit the memory block again if necessary." + ) + + # return None + return success_msg + + async def memory_finish_edits(self, agent_state: AgentState, actor: User) -> None: + """ + Call the memory_finish_edits command when you are finished making edits + (integrating all new information) into the memory blocks. This function + is called when the agent is done rethinking the memory. + + Returns: + Optional[str]: None is always returned as this function does not produce a response. + """ + return None diff --git a/letta/services/tool_executor/files_tool_executor.py b/letta/services/tool_executor/files_tool_executor.py new file mode 100644 index 00000000..ec3ab6b5 --- /dev/null +++ b/letta/services/tool_executor/files_tool_executor.py @@ -0,0 +1,130 @@ +from typing import Any, Dict, List, Optional, Tuple + +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxConfig +from letta.schemas.tool import Tool +from letta.schemas.tool_execution_result import ToolExecutionResult +from letta.schemas.user import User +from letta.services.agent_manager import AgentManager +from letta.services.block_manager import BlockManager +from letta.services.files_agents_manager import FileAgentManager +from letta.services.message_manager import MessageManager +from letta.services.passage_manager import PassageManager +from letta.services.source_manager import SourceManager +from letta.services.tool_executor.tool_executor_base import ToolExecutor +from letta.utils import get_friendly_error_msg + + +class LettaFileToolExecutor(ToolExecutor): + """Executor for Letta file tools with direct implementation of functions.""" + + def __init__( + self, + message_manager: MessageManager, + agent_manager: AgentManager, + block_manager: BlockManager, + passage_manager: PassageManager, + actor: User, + ): + super().__init__( + message_manager=message_manager, + agent_manager=agent_manager, + block_manager=block_manager, + passage_manager=passage_manager, + actor=actor, + ) + + # TODO: This should be passed in to for testing purposes + self.files_agents_manager = FileAgentManager() + self.source_manager = SourceManager() + + async def execute( + self, + function_name: str, + function_args: dict, + tool: Tool, + actor: User, + agent_state: Optional[AgentState] = None, + sandbox_config: Optional[SandboxConfig] = None, + sandbox_env_vars: Optional[Dict[str, Any]] = None, + ) -> ToolExecutionResult: + if agent_state is None: + raise ValueError("Agent state is required for file tools") + + function_map = { + "open_file": self.open_file, + "close_file": self.close_file, + "grep": self.grep, + "search_files": self.search_files, + } + + if function_name not in function_map: + raise ValueError(f"Unknown function: {function_name}") + + function_args_copy = function_args.copy() + try: + func_return = await function_map[function_name](agent_state, **function_args_copy) + return ToolExecutionResult( + status="success", + func_return=func_return, + agent_state=agent_state, + ) + except Exception as e: + return ToolExecutionResult( + status="error", + func_return=e, + agent_state=agent_state, + stderr=[get_friendly_error_msg(function_name=function_name, exception_name=type(e).__name__, exception_message=str(e))], + ) + + async def open_file(self, agent_state: AgentState, file_name: str, view_range: Optional[Tuple[int, int]] = None) -> str: + """Stub for open_file tool.""" + start, end = None, None + if view_range: + start, end = view_range + if start >= end: + raise ValueError(f"Provided view range {view_range} is invalid, starting range must be less than ending range.") + + # TODO: This is inefficient. We can skip the initial DB lookup by preserving on the block metadata what the file_id is + file_agent = await self.files_agents_manager.get_file_agent_by_file_name( + agent_id=agent_state.id, file_name=file_name, actor=self.actor + ) + + if not file_agent: + file_blocks = agent_state.memory.file_blocks + file_names = [fb.label for fb in file_blocks] + raise ValueError( + f"{file_name} not attached - did you get the filename correct? Currently you have the following files attached: {file_names}" + ) + + file_id = file_agent.file_id + file = await self.source_manager.get_file_by_id(file_id=file_id, actor=self.actor, include_content=True) + + # TODO: Inefficient, maybe we can pre-compute this + content_lines = [ + line.strip() for line in file.content.split("\n") if line.strip() # remove leading/trailing whitespace # skip empty lines + ] + + if start and end: + content_lines = content_lines[start:end] + + visible_content = "\n".join(content_lines) + await self.files_agents_manager.update_file_agent_by_id( + agent_id=agent_state.id, file_id=file_id, actor=self.actor, is_open=True, visible_content=visible_content + ) + return "Success" + + async def close_file(self, agent_state: AgentState, file_name: str) -> str: + """Stub for close_file tool.""" + await self.files_agents_manager.update_file_agent_by_name( + agent_id=agent_state.id, file_name=file_name, actor=self.actor, is_open=False + ) + return "Success" + + async def grep(self, agent_state: AgentState, pattern: str) -> str: + """Stub for grep tool.""" + raise NotImplementedError + + async def search_files(self, agent_state: AgentState, query: str) -> List[Any]: + """Stub for search_files tool.""" + raise NotImplementedError diff --git a/letta/services/tool_executor/mcp_tool_executor.py b/letta/services/tool_executor/mcp_tool_executor.py new file mode 100644 index 00000000..eaef6ea3 --- /dev/null +++ b/letta/services/tool_executor/mcp_tool_executor.py @@ -0,0 +1,45 @@ +from typing import Any, Dict, Optional + +from letta.constants import MCP_TOOL_TAG_NAME_PREFIX +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxConfig +from letta.schemas.tool import Tool +from letta.schemas.tool_execution_result import ToolExecutionResult +from letta.schemas.user import User +from letta.services.mcp_manager import MCPManager +from letta.services.tool_executor.tool_executor_base import ToolExecutor +from letta.tracing import trace_method + + +class ExternalMCPToolExecutor(ToolExecutor): + """Executor for external MCP tools.""" + + @trace_method + async def execute( + self, + function_name: str, + function_args: dict, + tool: Tool, + actor: User, + agent_state: Optional[AgentState] = None, + sandbox_config: Optional[SandboxConfig] = None, + sandbox_env_vars: Optional[Dict[str, Any]] = None, + ) -> ToolExecutionResult: + + pass + + mcp_server_tag = [tag for tag in tool.tags if tag.startswith(f"{MCP_TOOL_TAG_NAME_PREFIX}:")] + if not mcp_server_tag: + raise ValueError(f"Tool {tool.name} does not have a valid MCP server tag") + mcp_server_name = mcp_server_tag[0].split(":")[1] + + mcp_manager = MCPManager() + # TODO: may need to have better client connection management + function_response, success = await mcp_manager.execute_mcp_server_tool( + mcp_server_name=mcp_server_name, tool_name=function_name, tool_args=function_args, actor=actor + ) + + return ToolExecutionResult( + status="success" if success else "error", + func_return=function_response, + ) diff --git a/letta/services/tool_executor/multi_agent_tool_executor.py b/letta/services/tool_executor/multi_agent_tool_executor.py new file mode 100644 index 00000000..02accc2e --- /dev/null +++ b/letta/services/tool_executor/multi_agent_tool_executor.py @@ -0,0 +1,123 @@ +import asyncio +from typing import Any, Dict, List, Optional + +from letta.schemas.agent import AgentState +from letta.schemas.enums import MessageRole +from letta.schemas.letta_message import AssistantMessage +from letta.schemas.letta_message_content import TextContent +from letta.schemas.message import MessageCreate +from letta.schemas.sandbox_config import SandboxConfig +from letta.schemas.tool import Tool +from letta.schemas.tool_execution_result import ToolExecutionResult +from letta.schemas.user import User +from letta.services.tool_executor.tool_executor import logger +from letta.services.tool_executor.tool_executor_base import ToolExecutor + + +class LettaMultiAgentToolExecutor(ToolExecutor): + """Executor for LETTA multi-agent core tools.""" + + async def execute( + self, + function_name: str, + function_args: dict, + tool: Tool, + actor: User, + agent_state: Optional[AgentState] = None, + sandbox_config: Optional[SandboxConfig] = None, + sandbox_env_vars: Optional[Dict[str, Any]] = None, + ) -> ToolExecutionResult: + assert agent_state is not None, "Agent state is required for multi-agent tools" + function_map = { + "send_message_to_agent_and_wait_for_reply": self.send_message_to_agent_and_wait_for_reply, + "send_message_to_agent_async": self.send_message_to_agent_async, + "send_message_to_agents_matching_tags": self.send_message_to_agents_matching_tags_async, + } + + if function_name not in function_map: + raise ValueError(f"Unknown function: {function_name}") + + # Execute the appropriate function + function_args_copy = function_args.copy() # Make a copy to avoid modifying the original + function_response = await function_map[function_name](agent_state, **function_args_copy) + return ToolExecutionResult( + status="success", + func_return=function_response, + ) + + async def send_message_to_agent_and_wait_for_reply(self, agent_state: AgentState, message: str, other_agent_id: str) -> str: + augmented_message = ( + f"[Incoming message from agent with ID '{agent_state.id}' - to reply to this message, " + f"make sure to use the 'send_message' at the end, and the system will notify the sender of your response] " + f"{message}" + ) + + return str(await self._process_agent(agent_id=other_agent_id, message=augmented_message)) + + async def send_message_to_agent_async(self, agent_state: AgentState, message: str, other_agent_id: str) -> str: + # 1) Build the prefixed system‐message + prefixed = ( + f"[Incoming message from agent with ID '{agent_state.id}' - " + f"to reply to this message, make sure to use the " + f"'send_message_to_agent_async' tool, or the agent will not receive your message] " + f"{message}" + ) + + task = asyncio.create_task(self._process_agent(agent_id=other_agent_id, message=prefixed)) + + task.add_done_callback(lambda t: (logger.error(f"Async send_message task failed: {t.exception()}") if t.exception() else None)) + + return "Successfully sent message" + + async def send_message_to_agents_matching_tags_async( + self, agent_state: AgentState, message: str, match_all: List[str], match_some: List[str] + ) -> str: + # Find matching agents + matching_agents = await self.agent_manager.list_agents_matching_tags_async( + actor=self.actor, match_all=match_all, match_some=match_some + ) + if not matching_agents: + return str([]) + + augmented_message = ( + "[Incoming message from external Letta agent - to reply to this message, " + "make sure to use the 'send_message' at the end, and the system will notify " + "the sender of your response] " + f"{message}" + ) + + tasks = [ + asyncio.create_task(self._process_agent(agent_id=agent_state.id, message=augmented_message)) for agent_state in matching_agents + ] + results = await asyncio.gather(*tasks) + return str(results) + + async def _process_agent(self, agent_id: str, message: str) -> Dict[str, Any]: + from letta.agents.letta_agent import LettaAgent + + try: + letta_agent = LettaAgent( + agent_id=agent_id, + message_manager=self.message_manager, + agent_manager=self.agent_manager, + block_manager=self.block_manager, + passage_manager=self.passage_manager, + actor=self.actor, + ) + + letta_response = await letta_agent.step([MessageCreate(role=MessageRole.system, content=[TextContent(text=message)])]) + messages = letta_response.messages + + send_message_content = [message.content for message in messages if isinstance(message, AssistantMessage)] + + return { + "agent_id": agent_id, + "response": send_message_content if send_message_content else [""], + } + + except Exception as e: + return { + "agent_id": agent_id, + "error": str(e), + "type": type(e).__name__, + } diff --git a/letta/services/tool_executor/tool_execution_manager.py b/letta/services/tool_executor/tool_execution_manager.py index f8aa622e..085019ee 100644 --- a/letta/services/tool_executor/tool_execution_manager.py +++ b/letta/services/tool_executor/tool_execution_manager.py @@ -13,15 +13,14 @@ from letta.services.agent_manager import AgentManager from letta.services.block_manager import BlockManager from letta.services.message_manager import MessageManager from letta.services.passage_manager import PassageManager -from letta.services.tool_executor.tool_executor import ( - ExternalComposioToolExecutor, - ExternalMCPToolExecutor, - LettaBuiltinToolExecutor, - LettaCoreToolExecutor, - LettaMultiAgentToolExecutor, - SandboxToolExecutor, - ToolExecutor, -) +from letta.services.tool_executor.builtin_tool_executor import LettaBuiltinToolExecutor +from letta.services.tool_executor.composio_tool_executor import ExternalComposioToolExecutor +from letta.services.tool_executor.core_tool_executor import LettaCoreToolExecutor +from letta.services.tool_executor.files_tool_executor import LettaFileToolExecutor +from letta.services.tool_executor.mcp_tool_executor import ExternalMCPToolExecutor +from letta.services.tool_executor.multi_agent_tool_executor import LettaMultiAgentToolExecutor +from letta.services.tool_executor.tool_executor import SandboxToolExecutor +from letta.services.tool_executor.tool_executor_base import ToolExecutor from letta.tracing import trace_method from letta.utils import get_friendly_error_msg @@ -35,6 +34,7 @@ class ToolExecutorFactory: ToolType.LETTA_SLEEPTIME_CORE: LettaCoreToolExecutor, ToolType.LETTA_MULTI_AGENT_CORE: LettaMultiAgentToolExecutor, ToolType.LETTA_BUILTIN: LettaBuiltinToolExecutor, + ToolType.LETTA_FILES_CORE: LettaFileToolExecutor, ToolType.EXTERNAL_COMPOSIO: ExternalComposioToolExecutor, ToolType.EXTERNAL_MCP: ExternalMCPToolExecutor, } diff --git a/letta/services/tool_executor/tool_executor.py b/letta/services/tool_executor/tool_executor.py index 7e970146..f34e7d01 100644 --- a/letta/services/tool_executor/tool_executor.py +++ b/letta/services/tool_executor/tool_executor.py @@ -1,41 +1,15 @@ -import asyncio -import json -import math import traceback -from abc import ABC, abstractmethod -from textwrap import shorten -from typing import Any, Dict, List, Literal, Optional +from typing import Any, Dict, Optional -from letta.constants import ( - COMPOSIO_ENTITY_ENV_VAR_KEY, - CORE_MEMORY_LINE_NUMBER_WARNING, - MCP_TOOL_TAG_NAME_PREFIX, - MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX, - READ_ONLY_BLOCK_EDIT_ERROR, - RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE, - WEB_SEARCH_CLIP_CONTENT, - WEB_SEARCH_INCLUDE_SCORE, - WEB_SEARCH_SEPARATOR, -) from letta.functions.ast_parsers import coerce_dict_args_by_annotations, get_function_annotations_from_source -from letta.functions.composio_helpers import execute_composio_action_async, generate_composio_action_from_func_name -from letta.helpers.composio_helpers import get_composio_api_key_async -from letta.helpers.json_helpers import json_dumps from letta.log import get_logger from letta.schemas.agent import AgentState -from letta.schemas.enums import MessageRole -from letta.schemas.letta_message import AssistantMessage -from letta.schemas.letta_message_content import TextContent -from letta.schemas.message import MessageCreate from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool from letta.schemas.tool_execution_result import ToolExecutionResult from letta.schemas.user import User from letta.services.agent_manager import AgentManager -from letta.services.block_manager import BlockManager -from letta.services.mcp_manager import MCPManager -from letta.services.message_manager import MessageManager -from letta.services.passage_manager import PassageManager +from letta.services.tool_executor.tool_executor_base import ToolExecutor from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B from letta.services.tool_sandbox.local_sandbox import AsyncToolSandboxLocal from letta.settings import tool_settings @@ -46,675 +20,6 @@ from letta.utils import get_friendly_error_msg logger = get_logger(__name__) -class ToolExecutor(ABC): - """Abstract base class for tool executors.""" - - def __init__( - self, - message_manager: MessageManager, - agent_manager: AgentManager, - block_manager: BlockManager, - passage_manager: PassageManager, - actor: User, - ): - self.message_manager = message_manager - self.agent_manager = agent_manager - self.block_manager = block_manager - self.passage_manager = passage_manager - self.actor = actor - - @abstractmethod - async def execute( - self, - function_name: str, - function_args: dict, - tool: Tool, - actor: User, - agent_state: Optional[AgentState] = None, - sandbox_config: Optional[SandboxConfig] = None, - sandbox_env_vars: Optional[Dict[str, Any]] = None, - ) -> ToolExecutionResult: - """Execute the tool and return the result.""" - - -class LettaCoreToolExecutor(ToolExecutor): - """Executor for LETTA core tools with direct implementation of functions.""" - - async def execute( - self, - function_name: str, - function_args: dict, - tool: Tool, - actor: User, - agent_state: Optional[AgentState] = None, - sandbox_config: Optional[SandboxConfig] = None, - sandbox_env_vars: Optional[Dict[str, Any]] = None, - ) -> ToolExecutionResult: - # Map function names to method calls - assert agent_state is not None, "Agent state is required for core tools" - function_map = { - "send_message": self.send_message, - "conversation_search": self.conversation_search, - "archival_memory_search": self.archival_memory_search, - "archival_memory_insert": self.archival_memory_insert, - "core_memory_append": self.core_memory_append, - "core_memory_replace": self.core_memory_replace, - "memory_replace": self.memory_replace, - "memory_insert": self.memory_insert, - "memory_rethink": self.memory_rethink, - "memory_finish_edits": self.memory_finish_edits, - } - - if function_name not in function_map: - raise ValueError(f"Unknown function: {function_name}") - - # Execute the appropriate function - function_args_copy = function_args.copy() # Make a copy to avoid modifying the original - try: - function_response = await function_map[function_name](agent_state, actor, **function_args_copy) - return ToolExecutionResult( - status="success", - func_return=function_response, - agent_state=agent_state, - ) - except Exception as e: - return ToolExecutionResult( - status="error", - func_return=e, - agent_state=agent_state, - stderr=[get_friendly_error_msg(function_name=function_name, exception_name=type(e).__name__, exception_message=str(e))], - ) - - async def send_message(self, agent_state: AgentState, actor: User, message: str) -> Optional[str]: - """ - Sends a message to the human user. - - Args: - message (str): Message contents. All unicode (including emojis) are supported. - - Returns: - Optional[str]: None is always returned as this function does not produce a response. - """ - return "Sent message successfully." - - async def conversation_search(self, agent_state: AgentState, actor: User, query: str, page: Optional[int] = 0) -> Optional[str]: - """ - Search prior conversation history using case-insensitive string matching. - - Args: - query (str): String to search for. - page (int): Allows you to page through results. Only use on a follow-up query. Defaults to 0 (first page). - - Returns: - str: Query result string - """ - if page is None or (isinstance(page, str) and page.lower().strip() == "none"): - page = 0 - try: - page = int(page) - except: - raise ValueError(f"'page' argument must be an integer") - - count = RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE - messages = await MessageManager().list_user_messages_for_agent_async( - agent_id=agent_state.id, - actor=actor, - query_text=query, - limit=count, - ) - - total = len(messages) - num_pages = math.ceil(total / count) - 1 # 0 index - - if len(messages) == 0: - results_str = f"No results found." - else: - results_pref = f"Showing {len(messages)} of {total} results (page {page}/{num_pages}):" - results_formatted = [message.content[0].text for message in messages] - results_str = f"{results_pref} {json_dumps(results_formatted)}" - - return results_str - - async def archival_memory_search( - self, agent_state: AgentState, actor: User, query: str, page: Optional[int] = 0, start: Optional[int] = 0 - ) -> Optional[str]: - """ - Search archival memory using semantic (embedding-based) search. - - Args: - query (str): String to search for. - page (Optional[int]): Allows you to page through results. Only use on a follow-up query. Defaults to 0 (first page). - start (Optional[int]): Starting index for the search results. Defaults to 0. - - Returns: - str: Query result string - """ - if page is None or (isinstance(page, str) and page.lower().strip() == "none"): - page = 0 - try: - page = int(page) - except: - raise ValueError(f"'page' argument must be an integer") - - count = RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE - - try: - # Get results using passage manager - all_results = await AgentManager().list_passages_async( - actor=actor, - agent_id=agent_state.id, - query_text=query, - limit=count + start, # Request enough results to handle offset - embedding_config=agent_state.embedding_config, - embed_query=True, - ) - - # Apply pagination - end = min(count + start, len(all_results)) - paged_results = all_results[start:end] - - # Format results to match previous implementation - formatted_results = [{"timestamp": str(result.created_at), "content": result.text} for result in paged_results] - - return formatted_results, len(formatted_results) - - except Exception as e: - raise e - - async def archival_memory_insert(self, agent_state: AgentState, actor: User, content: str) -> Optional[str]: - """ - Add to archival memory. Make sure to phrase the memory contents such that it can be easily queried later. - - Args: - content (str): Content to write to the memory. All unicode (including emojis) are supported. - - Returns: - Optional[str]: None is always returned as this function does not produce a response. - """ - await PassageManager().insert_passage_async( - agent_state=agent_state, - agent_id=agent_state.id, - text=content, - actor=actor, - ) - await AgentManager().rebuild_system_prompt_async(agent_id=agent_state.id, actor=actor, force=True) - return None - - async def core_memory_append(self, agent_state: AgentState, actor: User, label: str, content: str) -> Optional[str]: - """ - Append to the contents of core memory. - - Args: - label (str): Section of the memory to be edited (persona or human). - content (str): Content to write to the memory. All unicode (including emojis) are supported. - - Returns: - Optional[str]: None is always returned as this function does not produce a response. - """ - if agent_state.memory.get_block(label).read_only: - raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") - current_value = str(agent_state.memory.get_block(label).value) - new_value = current_value + "\n" + str(content) - agent_state.memory.update_block_value(label=label, value=new_value) - await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) - return None - - async def core_memory_replace( - self, - agent_state: AgentState, - actor: User, - label: str, - old_content: str, - new_content: str, - ) -> Optional[str]: - """ - Replace the contents of core memory. To delete memories, use an empty string for new_content. - - Args: - label (str): Section of the memory to be edited (persona or human). - old_content (str): String to replace. Must be an exact match. - new_content (str): Content to write to the memory. All unicode (including emojis) are supported. - - Returns: - Optional[str]: None is always returned as this function does not produce a response. - """ - if agent_state.memory.get_block(label).read_only: - raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") - current_value = str(agent_state.memory.get_block(label).value) - if old_content not in current_value: - raise ValueError(f"Old content '{old_content}' not found in memory block '{label}'") - new_value = current_value.replace(str(old_content), str(new_content)) - agent_state.memory.update_block_value(label=label, value=new_value) - await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) - return None - - async def memory_replace( - self, - agent_state: AgentState, - actor: User, - label: str, - old_str: str, - new_str: Optional[str] = None, - ) -> str: - """ - The memory_replace command allows you to replace a specific string in a memory - block with a new string. This is used for making precise edits. - - Args: - label (str): Section of the memory to be edited, identified by its label. - old_str (str): The text to replace (must match exactly, including whitespace - and indentation). Do not include line number prefixes. - new_str (Optional[str]): The new text to insert in place of the old text. - Omit this argument to delete the old_str. Do not include line number prefixes. - - Returns: - str: The success message - """ - - if agent_state.memory.get_block(label).read_only: - raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") - - if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(old_str)): - raise ValueError( - "old_str contains a line number prefix, which is not allowed. " - "Do not include line numbers when calling memory tools (line " - "numbers are for display purposes only)." - ) - if CORE_MEMORY_LINE_NUMBER_WARNING in old_str: - raise ValueError( - "old_str contains a line number warning, which is not allowed. " - "Do not include line number information when calling memory tools " - "(line numbers are for display purposes only)." - ) - if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(new_str)): - raise ValueError( - "new_str contains a line number prefix, which is not allowed. " - "Do not include line numbers when calling memory tools (line " - "numbers are for display purposes only)." - ) - - old_str = str(old_str).expandtabs() - new_str = str(new_str).expandtabs() - current_value = str(agent_state.memory.get_block(label).value).expandtabs() - - # Check if old_str is unique in the block - occurences = current_value.count(old_str) - if occurences == 0: - raise ValueError( - f"No replacement was performed, old_str `{old_str}` did not appear " f"verbatim in memory block with label `{label}`." - ) - elif occurences > 1: - content_value_lines = current_value.split("\n") - lines = [idx + 1 for idx, line in enumerate(content_value_lines) if old_str in line] - raise ValueError( - f"No replacement was performed. Multiple occurrences of " - f"old_str `{old_str}` in lines {lines}. Please ensure it is unique." - ) - - # Replace old_str with new_str - new_value = current_value.replace(str(old_str), str(new_str)) - - # Write the new content to the block - agent_state.memory.update_block_value(label=label, value=new_value) - - await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) - - # Create a snippet of the edited section - SNIPPET_LINES = 3 - replacement_line = current_value.split(old_str)[0].count("\n") - start_line = max(0, replacement_line - SNIPPET_LINES) - end_line = replacement_line + SNIPPET_LINES + new_str.count("\n") - snippet = "\n".join(new_value.split("\n")[start_line : end_line + 1]) - - # Prepare the success message - success_msg = f"The core memory block with label `{label}` has been edited. " - # success_msg += self._make_output( - # snippet, f"a snippet of {path}", start_line + 1 - # ) - # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" - success_msg += ( - "Review the changes and make sure they are as expected (correct indentation, " - "no duplicate lines, etc). Edit the memory block again if necessary." - ) - - # return None - return success_msg - - async def memory_insert( - self, - agent_state: AgentState, - actor: User, - label: str, - new_str: str, - insert_line: int = -1, - ) -> str: - """ - The memory_insert command allows you to insert text at a specific location - in a memory block. - - Args: - label (str): Section of the memory to be edited, identified by its label. - new_str (str): The text to insert. Do not include line number prefixes. - insert_line (int): The line number after which to insert the text (0 for - beginning of file). Defaults to -1 (end of the file). - - Returns: - str: The success message - """ - - if agent_state.memory.get_block(label).read_only: - raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") - - if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(new_str)): - raise ValueError( - "new_str contains a line number prefix, which is not allowed. Do not " - "include line numbers when calling memory tools (line numbers are for " - "display purposes only)." - ) - if CORE_MEMORY_LINE_NUMBER_WARNING in new_str: - raise ValueError( - "new_str contains a line number warning, which is not allowed. Do not " - "include line number information when calling memory tools (line numbers " - "are for display purposes only)." - ) - - current_value = str(agent_state.memory.get_block(label).value).expandtabs() - new_str = str(new_str).expandtabs() - current_value_lines = current_value.split("\n") - n_lines = len(current_value_lines) - - # Check if we're in range, from 0 (pre-line), to 1 (first line), to n_lines (last line) - if insert_line == -1: - insert_line = n_lines - elif insert_line < 0 or insert_line > n_lines: - raise ValueError( - f"Invalid `insert_line` parameter: {insert_line}. It should be within " - f"the range of lines of the memory block: {[0, n_lines]}, or -1 to " - f"append to the end of the memory block." - ) - - # Insert the new string as a line - SNIPPET_LINES = 3 - new_str_lines = new_str.split("\n") - new_value_lines = current_value_lines[:insert_line] + new_str_lines + current_value_lines[insert_line:] - snippet_lines = ( - current_value_lines[max(0, insert_line - SNIPPET_LINES) : insert_line] - + new_str_lines - + current_value_lines[insert_line : insert_line + SNIPPET_LINES] - ) - - # Collate into the new value to update - new_value = "\n".join(new_value_lines) - snippet = "\n".join(snippet_lines) - - # Write into the block - agent_state.memory.update_block_value(label=label, value=new_value) - - await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) - - # Prepare the success message - success_msg = f"The core memory block with label `{label}` has been edited. " - # success_msg += self._make_output( - # snippet, - # "a snippet of the edited file", - # max(1, insert_line - SNIPPET_LINES + 1), - # ) - # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" - success_msg += ( - "Review the changes and make sure they are as expected (correct indentation, " - "no duplicate lines, etc). Edit the memory block again if necessary." - ) - - return success_msg - - async def memory_rethink(self, agent_state: AgentState, actor: User, label: str, new_memory: str) -> str: - """ - The memory_rethink command allows you to completely rewrite the contents of a - memory block. Use this tool to make large sweeping changes (e.g. when you want - to condense or reorganize the memory blocks), do NOT use this tool to make small - precise edits (e.g. add or remove a line, replace a specific string, etc). - - Args: - label (str): The memory block to be rewritten, identified by its label. - new_memory (str): The new memory contents with information integrated from - existing memory blocks and the conversation context. Do not include line number prefixes. - - Returns: - str: The success message - """ - if agent_state.memory.get_block(label).read_only: - raise ValueError(f"{READ_ONLY_BLOCK_EDIT_ERROR}") - - if bool(MEMORY_TOOLS_LINE_NUMBER_PREFIX_REGEX.search(new_memory)): - raise ValueError( - "new_memory contains a line number prefix, which is not allowed. Do not " - "include line numbers when calling memory tools (line numbers are for " - "display purposes only)." - ) - if CORE_MEMORY_LINE_NUMBER_WARNING in new_memory: - raise ValueError( - "new_memory contains a line number warning, which is not allowed. Do not " - "include line number information when calling memory tools (line numbers " - "are for display purposes only)." - ) - - if agent_state.memory.get_block(label) is None: - agent_state.memory.create_block(label=label, value=new_memory) - - agent_state.memory.update_block_value(label=label, value=new_memory) - - await AgentManager().update_memory_if_changed_async(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor) - - # Prepare the success message - success_msg = f"The core memory block with label `{label}` has been edited. " - # success_msg += self._make_output( - # snippet, f"a snippet of {path}", start_line + 1 - # ) - # success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n" - success_msg += ( - "Review the changes and make sure they are as expected (correct indentation, " - "no duplicate lines, etc). Edit the memory block again if necessary." - ) - - # return None - return success_msg - - async def memory_finish_edits(self, agent_state: AgentState, actor: User) -> None: - """ - Call the memory_finish_edits command when you are finished making edits - (integrating all new information) into the memory blocks. This function - is called when the agent is done rethinking the memory. - - Returns: - Optional[str]: None is always returned as this function does not produce a response. - """ - return None - - -class LettaMultiAgentToolExecutor(ToolExecutor): - """Executor for LETTA multi-agent core tools.""" - - async def execute( - self, - function_name: str, - function_args: dict, - tool: Tool, - actor: User, - agent_state: Optional[AgentState] = None, - sandbox_config: Optional[SandboxConfig] = None, - sandbox_env_vars: Optional[Dict[str, Any]] = None, - ) -> ToolExecutionResult: - assert agent_state is not None, "Agent state is required for multi-agent tools" - function_map = { - "send_message_to_agent_and_wait_for_reply": self.send_message_to_agent_and_wait_for_reply, - "send_message_to_agent_async": self.send_message_to_agent_async, - "send_message_to_agents_matching_tags": self.send_message_to_agents_matching_tags_async, - } - - if function_name not in function_map: - raise ValueError(f"Unknown function: {function_name}") - - # Execute the appropriate function - function_args_copy = function_args.copy() # Make a copy to avoid modifying the original - function_response = await function_map[function_name](agent_state, **function_args_copy) - return ToolExecutionResult( - status="success", - func_return=function_response, - ) - - async def send_message_to_agent_and_wait_for_reply(self, agent_state: AgentState, message: str, other_agent_id: str) -> str: - augmented_message = ( - f"[Incoming message from agent with ID '{agent_state.id}' - to reply to this message, " - f"make sure to use the 'send_message' at the end, and the system will notify the sender of your response] " - f"{message}" - ) - - return str(await self._process_agent(agent_id=other_agent_id, message=augmented_message)) - - async def send_message_to_agent_async(self, agent_state: AgentState, message: str, other_agent_id: str) -> str: - # 1) Build the prefixed system‐message - prefixed = ( - f"[Incoming message from agent with ID '{agent_state.id}' - " - f"to reply to this message, make sure to use the " - f"'send_message_to_agent_async' tool, or the agent will not receive your message] " - f"{message}" - ) - - task = asyncio.create_task(self._process_agent(agent_id=other_agent_id, message=prefixed)) - - task.add_done_callback(lambda t: (logger.error(f"Async send_message task failed: {t.exception()}") if t.exception() else None)) - - return "Successfully sent message" - - async def send_message_to_agents_matching_tags_async( - self, agent_state: AgentState, message: str, match_all: List[str], match_some: List[str] - ) -> str: - # Find matching agents - matching_agents = await self.agent_manager.list_agents_matching_tags_async( - actor=self.actor, match_all=match_all, match_some=match_some - ) - if not matching_agents: - return str([]) - - augmented_message = ( - "[Incoming message from external Letta agent - to reply to this message, " - "make sure to use the 'send_message' at the end, and the system will notify " - "the sender of your response] " - f"{message}" - ) - - tasks = [ - asyncio.create_task(self._process_agent(agent_id=agent_state.id, message=augmented_message)) for agent_state in matching_agents - ] - results = await asyncio.gather(*tasks) - return str(results) - - async def _process_agent(self, agent_id: str, message: str) -> Dict[str, Any]: - from letta.agents.letta_agent import LettaAgent - - try: - letta_agent = LettaAgent( - agent_id=agent_id, - message_manager=self.message_manager, - agent_manager=self.agent_manager, - block_manager=self.block_manager, - passage_manager=self.passage_manager, - actor=self.actor, - ) - - letta_response = await letta_agent.step([MessageCreate(role=MessageRole.system, content=[TextContent(text=message)])]) - messages = letta_response.messages - - send_message_content = [message.content for message in messages if isinstance(message, AssistantMessage)] - - return { - "agent_id": agent_id, - "response": send_message_content if send_message_content else [""], - } - - except Exception as e: - return { - "agent_id": agent_id, - "error": str(e), - "type": type(e).__name__, - } - - -class ExternalComposioToolExecutor(ToolExecutor): - """Executor for external Composio tools.""" - - @trace_method - async def execute( - self, - function_name: str, - function_args: dict, - tool: Tool, - actor: User, - agent_state: Optional[AgentState] = None, - sandbox_config: Optional[SandboxConfig] = None, - sandbox_env_vars: Optional[Dict[str, Any]] = None, - ) -> ToolExecutionResult: - assert agent_state is not None, "Agent state is required for external Composio tools" - action_name = generate_composio_action_from_func_name(tool.name) - - # Get entity ID from the agent_state - entity_id = self._get_entity_id(agent_state) - - # Get composio_api_key - composio_api_key = await get_composio_api_key_async(actor=actor) - - # TODO (matt): Roll in execute_composio_action into this class - function_response = await execute_composio_action_async( - action_name=action_name, args=function_args, api_key=composio_api_key, entity_id=entity_id - ) - - return ToolExecutionResult( - status="success", - func_return=function_response, - ) - - def _get_entity_id(self, agent_state: AgentState) -> Optional[str]: - """Extract the entity ID from environment variables.""" - for env_var in agent_state.tool_exec_environment_variables: - if env_var.key == COMPOSIO_ENTITY_ENV_VAR_KEY: - return env_var.value - return None - - -class ExternalMCPToolExecutor(ToolExecutor): - """Executor for external MCP tools.""" - - @trace_method - async def execute( - self, - function_name: str, - function_args: dict, - tool: Tool, - actor: User, - agent_state: Optional[AgentState] = None, - sandbox_config: Optional[SandboxConfig] = None, - sandbox_env_vars: Optional[Dict[str, Any]] = None, - ) -> ToolExecutionResult: - - pass - - mcp_server_tag = [tag for tag in tool.tags if tag.startswith(f"{MCP_TOOL_TAG_NAME_PREFIX}:")] - if not mcp_server_tag: - raise ValueError(f"Tool {tool.name} does not have a valid MCP server tag") - mcp_server_name = mcp_server_tag[0].split(":")[1] - - mcp_manager = MCPManager() - # TODO: may need to have better client connection management - function_response, success = await mcp_manager.execute_mcp_server_tool( - mcp_server_name=mcp_server_name, tool_name=function_name, tool_args=function_args, actor=actor - ) - - return ToolExecutionResult( - status="success" if success else "error", - func_return=function_response, - ) - - class SandboxToolExecutor(ToolExecutor): """Executor for sandboxed tools.""" @@ -801,107 +106,3 @@ class SandboxToolExecutor(ToolExecutor): func_return=error_message, stderr=[stderr], ) - - -class LettaBuiltinToolExecutor(ToolExecutor): - """Executor for built in Letta tools.""" - - @trace_method - async def execute( - self, - function_name: str, - function_args: dict, - tool: Tool, - actor: User, - agent_state: Optional[AgentState] = None, - sandbox_config: Optional[SandboxConfig] = None, - sandbox_env_vars: Optional[Dict[str, Any]] = None, - ) -> ToolExecutionResult: - function_map = {"run_code": self.run_code, "web_search": self.web_search} - - if function_name not in function_map: - raise ValueError(f"Unknown function: {function_name}") - - # Execute the appropriate function - function_args_copy = function_args.copy() # Make a copy to avoid modifying the original - function_response = await function_map[function_name](**function_args_copy) - - return ToolExecutionResult( - status="success", - func_return=function_response, - agent_state=agent_state, - ) - - async def run_code(self, code: str, language: Literal["python", "js", "ts", "r", "java"]) -> str: - from e2b_code_interpreter import AsyncSandbox - - if tool_settings.e2b_api_key is None: - raise ValueError("E2B_API_KEY is not set") - - sbx = await AsyncSandbox.create(api_key=tool_settings.e2b_api_key) - params = {"code": code} - if language != "python": - # Leave empty for python - params["language"] = language - - res = self._llm_friendly_result(await sbx.run_code(**params)) - return json.dumps(res, ensure_ascii=False) - - def _llm_friendly_result(self, res): - out = { - "results": [r.text if hasattr(r, "text") else str(r) for r in res.results], - "logs": { - "stdout": getattr(res.logs, "stdout", []), - "stderr": getattr(res.logs, "stderr", []), - }, - } - err = getattr(res, "error", None) - if err is not None: - out["error"] = err - return out - - async def web_search(agent_state: "AgentState", query: str) -> str: - """ - Search the web for information. - Args: - query (str): The query to search the web for. - Returns: - str: The search results. - """ - - try: - from tavily import AsyncTavilyClient - except ImportError: - raise ImportError("tavily is not installed in the tool execution environment") - - # Check if the API key exists - if tool_settings.tavily_api_key is None: - raise ValueError("TAVILY_API_KEY is not set") - - # Instantiate client and search - tavily_client = AsyncTavilyClient(api_key=tool_settings.tavily_api_key) - search_results = await tavily_client.search(query=query, auto_parameters=True) - - results = search_results.get("results", []) - if not results: - return "No search results found." - - # ---- format for the LLM ------------------------------------------------- - formatted_blocks = [] - for idx, item in enumerate(results, start=1): - title = item.get("title") or "Untitled" - url = item.get("url") or "Unknown URL" - # keep each content snippet reasonably short so you don’t blow up context - content = ( - shorten(item.get("content", "").strip(), width=600, placeholder=" …") - if WEB_SEARCH_CLIP_CONTENT - else item.get("content", "").strip() - ) - score = item.get("score") - if WEB_SEARCH_INCLUDE_SCORE: - block = f"\nRESULT {idx}:\n" f"Title: {title}\n" f"URL: {url}\n" f"Relevance score: {score:.4f}\n" f"Content: {content}\n" - else: - block = f"\nRESULT {idx}:\n" f"Title: {title}\n" f"URL: {url}\n" f"Content: {content}\n" - formatted_blocks.append(block) - - return WEB_SEARCH_SEPARATOR.join(formatted_blocks) diff --git a/letta/services/tool_executor/tool_executor_base.py b/letta/services/tool_executor/tool_executor_base.py new file mode 100644 index 00000000..a8a7ccb2 --- /dev/null +++ b/letta/services/tool_executor/tool_executor_base.py @@ -0,0 +1,43 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from letta.schemas.agent import AgentState +from letta.schemas.sandbox_config import SandboxConfig +from letta.schemas.tool import Tool +from letta.schemas.tool_execution_result import ToolExecutionResult +from letta.schemas.user import User +from letta.services.agent_manager import AgentManager +from letta.services.block_manager import BlockManager +from letta.services.message_manager import MessageManager +from letta.services.passage_manager import PassageManager + + +class ToolExecutor(ABC): + """Abstract base class for tool executors.""" + + def __init__( + self, + message_manager: MessageManager, + agent_manager: AgentManager, + block_manager: BlockManager, + passage_manager: PassageManager, + actor: User, + ): + self.message_manager = message_manager + self.agent_manager = agent_manager + self.block_manager = block_manager + self.passage_manager = passage_manager + self.actor = actor + + @abstractmethod + async def execute( + self, + function_name: str, + function_args: dict, + tool: Tool, + actor: User, + agent_state: Optional[AgentState] = None, + sandbox_config: Optional[SandboxConfig] = None, + sandbox_env_vars: Optional[Dict[str, Any]] = None, + ) -> ToolExecutionResult: + """Execute the tool and return the result.""" diff --git a/letta/services/tool_manager.py b/letta/services/tool_manager.py index 78652d4d..cb0bc7dc 100644 --- a/letta/services/tool_manager.py +++ b/letta/services/tool_manager.py @@ -11,6 +11,8 @@ from letta.constants import ( BASE_VOICE_SLEEPTIME_CHAT_TOOLS, BASE_VOICE_SLEEPTIME_TOOLS, BUILTIN_TOOLS, + FILES_TOOLS, + LETTA_TOOL_MODULE_NAMES, LETTA_TOOL_SET, MCP_TOOL_TAG_NAME_PREFIX, MULTI_AGENT_TOOLS, @@ -368,12 +370,10 @@ class ToolManager: def upsert_base_tools(self, actor: PydanticUser) -> List[PydanticTool]: """Add default tools in base.py and multi_agent.py""" functions_to_schema = {} - module_names = ["base", "multi_agent", "voice", "builtin"] - for module_name in module_names: - full_module_name = f"letta.functions.function_sets.{module_name}" + for module_name in LETTA_TOOL_MODULE_NAMES: try: - module = importlib.import_module(full_module_name) + module = importlib.import_module(module_name) except Exception as e: # Handle other general exceptions raise e @@ -407,6 +407,9 @@ class ToolManager: elif name in BUILTIN_TOOLS: tool_type = ToolType.LETTA_BUILTIN tags = [tool_type.value] + elif name in FILES_TOOLS: + tool_type = ToolType.LETTA_FILES_CORE + tags = [tool_type.value] else: raise ValueError( f"Tool name {name} is not in the list of base tool names: {BASE_TOOLS + BASE_MEMORY_TOOLS + MULTI_AGENT_TOOLS + BASE_SLEEPTIME_TOOLS + BASE_VOICE_SLEEPTIME_TOOLS + BASE_VOICE_SLEEPTIME_CHAT_TOOLS}" @@ -434,12 +437,9 @@ class ToolManager: async def upsert_base_tools_async(self, actor: PydanticUser) -> List[PydanticTool]: """Add default tools in base.py and multi_agent.py""" functions_to_schema = {} - module_names = ["base", "multi_agent", "voice", "builtin"] - - for module_name in module_names: - full_module_name = f"letta.functions.function_sets.{module_name}" + for module_name in LETTA_TOOL_MODULE_NAMES: try: - module = importlib.import_module(full_module_name) + module = importlib.import_module(module_name) except Exception as e: # Handle other general exceptions raise e @@ -473,6 +473,9 @@ class ToolManager: elif name in BUILTIN_TOOLS: tool_type = ToolType.LETTA_BUILTIN tags = [tool_type.value] + elif name in FILES_TOOLS: + tool_type = ToolType.LETTA_FILES_CORE + tags = [tool_type.value] else: raise ValueError( f"Tool name {name} is not in the list of base tool names: {BASE_TOOLS + BASE_MEMORY_TOOLS + MULTI_AGENT_TOOLS + BASE_SLEEPTIME_TOOLS + BASE_VOICE_SLEEPTIME_TOOLS + BASE_VOICE_SLEEPTIME_CHAT_TOOLS}" diff --git a/tests/data/long_test.txt b/tests/data/long_test.txt new file mode 100644 index 00000000..618bcc4c --- /dev/null +++ b/tests/data/long_test.txt @@ -0,0 +1,412 @@ +testEnrico Letta (Italian: [enˈriːko ˈlɛtta]; born 20 August 1966) is an Italian politician who served as Prime Minister of Italy from April 2013 to February 2014, leading a grand coalition of centre-left and centre-right parties.[1] He was the leader of the Democratic Party (PD) from March 2021 to March 2023.[2] + +After working as an academic, Letta entered politics in 1998 when he was appointed to the Cabinet as Minister for the Community Policies, a role he held until 1999 when he was promoted to become Minister of Industry, Commerce, and Crafts. In 2001, he left the Cabinet upon his election to the Chamber of Deputies. From 2006 to 2008, he was appointed Secretary of the Council of Ministers.[3] In 2007, Letta was one of the senior founding members of the Democratic Party, and in 2009 was elected as its Deputy Secretary.[4] + +After the 2013 Italian general election produced an inconclusive result, and following negotiations between party leaders, President Giorgio Napolitano gave him the task of forming a national unity government (Letta Cabinet), composed of Letta's PD, the centre-right The People of Freedom (PdL), and the centrist Civic Choice, in order to mitigate the economic and social crises engulfing Italy as a result of the Great Recession. Following an agreement between parties, Letta resigned as PD Deputy Secretary and was appointed Prime Minister of Italy on 28 April 2013.[5][6] His government tried to promote economic recovery by securing a funding deal from the European Union to alleviate youth unemployment and abolished the party subsidies, something seen as a watershed moment for Italian politics, which for years had depended upon public funds.[7][8][9] Letta also faced the early stages of the 2015 European migrant crisis, including the 2013 Lampedusa migrant shipwreck, the deadliest shipwreck in the recent history of the Mediterranean Sea; in response, Letta implemented Operation Mare Nostrum to patrol the maritime borders and rescue migrants.[10] + +In November 2013, PdL leader Silvio Berlusconi attempted to withdraw his party's support from the government in order to bring about a change of prime minister; in response, all of the cabinet's centre-right ministers chose to leave the PdL and formed a new party, saying they wished to continue supporting Letta. Despite securing his position, the election in December 2013 of Matteo Renzi as PD secretary brought significant leadership tensions within the PD to public view. After several weeks of denying that he would seek a change, Renzi publicly challenged Letta for the position of prime minister on 13 February 2014. Letta quickly lost the support of his colleagues and resigned as prime minister on 22 February.[11] + +Following his resignation, Letta initially retired from politics, leaving Italy to accept appointment as dean of the School of International Affairs at Sciences Po in Paris.[12] In March 2021, the PD secretary Nicola Zingaretti resigned after growing tensions within the party.[13] Many prominent members of the party asked Letta to become the new leader; after a few days, Letta announced that he would return to Italy to accept the candidacy, and he was elected as new secretary by the national assembly on 14 March 2021.[14][15] On 4 October 2021, Letta was elected to the Chamber of Deputies for the Siena constituency.[16] He resigned on 20 December 2024.[17] to become Dean of IE University’s School of Politics, Economics and Global Affairs in Madrid, Spain.[18] + +Early life and education +Letta was born in Pisa, Tuscany, to Giorgio Letta, an Abruzzo-born professor of mathematics who taught probability theory at the University of Pisa, member of the Lincean Academy and of the National Academy of the Sciences, and Anna Banchi, born in Sassari and raised in Porto Torres of Tuscan and Sardinian origins.[19][20] Born into a numerous family, uncles on his father's side include the centre-right politician Gianni Letta, a close advisor of Silvio Berlusconi, and the archaeologist Cesare Letta, while one of his paternal aunts, Maria Teresa Letta, served as vice president of the Italian Red Cross;[19] a maternal great-uncle is the poet and playwright Gian Paolo Bazzoni.[20] + +After spending part of his childhood in Strasbourg,[21] Letta completed his schooling in Italy at the liceo classico Galileo Galilei in Pisa.[22] He has a degree in political science, which he received from the University of Pisa and subsequently obtained a PhD at the Sant'Anna School of Advanced Studies, a Graduate School with university status.[23][n 1] + +From 2001 to 2003, Letta was professor at the University Carlo Cattaneo near Varese, and then he taught at the Sant'Anna School in Pisa in 2003 and at the HEC Paris in 2004.[25] + +Political career + +Letta in 2001 + This article is part of +a series about +Enrico Letta +Political positions +Minister for the Community Policies (1998–99) +Minister of Industry (1999–2001) +Prime Minister of Italy (2013–14) +Democratic Party Secretary (2021–present) +Political career +2007 leadership electionLettiani360 Association +Prime Minister of Italy +2013 electionLetta CabinetGrand coalitionEuropean debt crisisMigrant crisis2013 Lampedusa shipwreckOperation Mare NostrumResignation +Secretary of the Democratic Party +Leadership2021 by-election2022 presidential election2022 government crisis2022 general election +Academic career +Sciences PoJacques Delors Institute + +vte +Letta, a Catholic,[26] began his political career in the Christian Democracy (DC),[27] the dominant centrist and Roman Catholic party, which ruled Italy for almost fifty years. From 1991 to 1995, Letta was president of the Youth of the European People's Party,[23] the official youth wing of the European People's Party, a European political party founded by national-level Christian democratic parties, including the Italian DC; he used his presidency to help strengthen long-term connections among a variety of centrist parties in Europe, and has since remained a convinced supporter of the European Union and European integration.[28][29] + +During the Ciampi Cabinet headed by Carlo Azeglio Ciampi in 1993 and 1994, Letta worked as chief of staff for the minister of foreign affairs, Beniamino Andreatta; Andreatta, a left-leaning Christian Democrat economist with whom Letta had already been collaborating in a think tank known as Agenzia di Ricerche e Legislazione (AREL), played a highly influential role in Letta's political career.[23][28] + +Following the collapse of the DC in 1994, Letta joined its immediate successor, the Italian People's Party (PPI); after serving as secretary general of the Euro Committee within the Ministry of Treasury from 1996 to 1997, he became deputy secretary of the party in 1997 and 1998, when it was fully allied with the centre-left.[30] In 1998, after the fall of Romano Prodi's first government, Letta was appointed Minister for the Community Policies in cabinet of Massimo D'Alema at the age of 32, becoming the youngest cabinet minister in post-war Italy.[27] + +In 1999, Letta became Minister of Industry, Commerce and Crafts in the second government of D'Alema; a position that he held until 2001, serving also in the cabinet of Giuliano Amato.[31] During Amato's government he held the role of Minister of Foreign Trade too.[32] + +In the 2001 Italian general election, Letta was elected to the Chamber of Deputies as a member of Democracy is Freedom – The Daisy, a newly formed centrist formation to which the Italian People's Party had joined.[30][33] In the following year, he was appointed national responsible for the economic policies of The Daisy.[34] + +In 2004, Letta was elected member of the European Parliament, with nearly 179,000 votes, within The Olive Tree list,[35] joining the Alliance of Liberals and Democrats for Europe (ALDE) group. As MEP he became a member of the Committee on Economic and Monetary Affairs.[36] Letta served also in the committee for relations with the Maghreb countries and the Arab Maghreb Union.[37] + +In 2006, Letta was re-elected to the Chamber of Deputies and was appointed Secretary of the Council of Ministers in the second government of Romano Prodi, thereby succeeding his uncle Gianni Letta who had held the same position in the outgoing cabinet of Silvio Berlusconi. In this post, he became the closest advisor of Prime Minister Prodi, becoming one of the most influential politicians within the government. However, Prodi's government fell after only two years following tensions within its majority caused by the resignation of the Minister of Justice, Clemente Mastella.[38][39] Following the 2008 Italian general election, which saw a victory of the centre-right, Letta returned the post to his uncle, when the Berlusconi IV Cabinet was sworn in.[28][29] + +Leadership election candidacy +Main article: 2007 Democratic Party (Italy) leadership election +In 2007, together with other The Daisy's members, Letta joined the Democratic Party (PD), the new centre-left party, born from the union between The Daisy and the Democrats of the Left.[40][41] Having been a founding member of the party, Letta run in the first leadership election, which was held as an open primary. He announced his candidacy in July 2007 through a YouTube video.[42] A few weeks after the announcement, he compared the PD to Wikipedia, stating: "As in Wikipedia, even in the PD each of the hundreds of thousands of members must bring their own contributions, their own skills, which in certain fields are certainly more important than mine and those of the other leaders of the centre-left."[43] In support of his candidacy, Letta founded the 360 Association, a centrist and Christian leftist group, mainly composed by former members of The Daisy.[44][45] + +Letta's candidacy was supported by prominent members of the Italian centre-left, like Francesco Cossiga, Paolo De Castro, Gianni Pittella, Vito De Filippo and many other former members of The Daisy.[46] Moreover, Letta's faction was composed by politicians considered close to Prime Minister Romano Prodi, a Christian leftist professor and founding father of the Italian centre-left.[47][48] However, Letta had to face the politician who, more than any other, had worked to the formation of the Democratic Party and who was unanimously considered the future leader of the centre-left, Walter Veltroni, the incumbent Mayor of Rome.[49] In the primary election, Veltroni won by a landslide with 75.8% of votes, followed by the former Minister of Health Rosy Bindi with 12.9% and Letta with 11.0%.[50] + +After the primary election, Veltroni appointed Letta as the national responsible for labour. In May 2008, after the defeat in the 2008 election, Letta was appointed Shadow Minister of Labour and Social Policies in the second and last Shadow Cabinet formed in Italy.[51] + +Deputy Secretary of the Democratic Party + +Letta during a convention of his 360 Association in 2012 +During the leadership election of 2009, Letta supported the eventual winner, the social-democrat Pier Luigi Bersani, being appointed Deputy Secretary by the party's national convention.[52] + +In June 2010, Letta organized a three-day meeting in Verona, during which he met, within its association, entrepreneurs and key leaders of Lega Nord, the largest party in Veneto and eastern Lombardy.[53][54] An opinion poll among northern Democrats, released during the "Nord Camp", showed that they were keener on an alliance with Lega Nord than Berlusconi's The People of Freedom.[55] Letta was praised both by Roberto Maroni and Umberto Bossi.[56] + +In the 2013 Italian general election, the centre-left alliance Italy Common Good led by Bersani won a clear majority of seats in the Chamber of Deputies, thanks to a majority bonus that has effectively trebled the number of seats assigned to the winning party, while in the popular vote, it narrowly defeated the centre-right alliance of former prime minister Berlusconi. Close behind, the new anti-establishment Five Star Movement of comedian Beppe Grillo became the third-strongest force, clearly ahead of the centrist coalition of outgoing Prime Minister Mario Monti. In the Senate, no political group or party won an outright majority, resulting in a hung parliament.[57][58] + +On 20 April 2013, when Bersani resigned as Secretary after the candidates for President of the Republic Franco Marini and Romano Prodi were defeated in the presidential election, the whole leadership of the PD, including Deputy Secretary Letta, resigned their positions. + +Prime Minister of Italy +Main article: Letta Cabinet +Government formation +Following five inconclusive ballots for the 2013 Italian presidential election, incumbent president Giorgio Napolitano accepted to be re-elected at the Quirinal Palace.[59] Eventually, Napolitano reluctantly agreed to serve for another term in order to safeguard the continuity of the country's institutions.[60][61] Napolitano was easily re-elected on 20 April 2013, receiving 738 of the 1007 possible votes, and was sworn in on 22 April 2013 after a speech when he asked for constitutional and electoral reforms.[62] + + +Letta with President Giorgio Napolitano in Rome, 2013 +After his re-election, Napolitano immediately began consultations with the chairmen of the Chamber of Deputies, Senate and political forces, after the failure of the previous attempt with Bersani, and the establishment of a panel of experts by the President himself (dubbed as wise men by the press), in order to outline priorities and formulate an agenda to deal with the persistent economic hardship and growing unemployment. On 24 April 2013, Enrico Letta was invited to form a government by President Napolitano, following weeks of political deadlock.[63] + +On 27 April, Letta formally accepted the task of leading a grand coalition government, with support from the centre-left Democratic Party, the centre-right People of Freedom (PdL) of Silvio Berlusconi and the centrist Civic Choice of outgoing PM Mario Monti. The government he formed became the first in the history of the Italian Republic to include representatives of all the major coalitions that had run in the latest election. His close relationship with his uncle, Gianni Letta, one of Berlusconi's most trusted advisors, was perceived as a way of overcoming the bitter hostility between the two opposing factions.[21][64] Letta appointed Angelino Alfano, secretary of the People of Freedom, as his Deputy Prime Minister. The new government was formally sworn-in as on 28 April.[65] During the swearing ceremony, a man fired gunshots outside Chigi Palace and wounded two Carabinieri.[66] The attacker, Luigi Preiti, was stopped and arrested; he declared that he wanted to kill politicians or at least to hit a "symbol of politics" and that he was forced by despair being unemployed and recently divorced.[67] + +On 29 April, Letta's government won the confidence vote in the Chamber with 453 votes in favour, 152 against and 17 abstentions.[68] On the following day, he won the confidence vote in Senate too, with 233 votes in favour, 59 against 18 abstentions.[69] In his first speech in front of the Parliament, Letta stressed "necessity to restore decency, sobriety and a sense of honour"; he also advocated for a reduction of politics' costs.[70] + +Economic policies + +Prime Minister Letta in 2013 +During his premiership, Letta had to face a serious socio-economic crisis caused by the Great Recession and the subsequent European debt crisis. In 2013, one of the major problems of the country was the huge youth unemployment, which was valued around 40%.[71] To face this issue, on 14 June 2013, Letta scheduled a summit at Chigi Palace with the ministers of the economy, finance and labour of Italy, Germany, France and Spain, to agree on common EU policies for reducing unemployment.[8] After a few weeks, during a press conference at the conclusion of the Council of the European Union in Brussels, Letta announced that Italy would receive 1.5 billion euros in EU funds to fight youth unemployment.[9] + +On 31 May, the Council of Ministers resolved to sponsor a bill to abolish party subsidies, which was widely considered a revolution in Italian politics and political parties, which heavily depended on public funds.[7] On 4 June, Letta, within his Minister of Economic Development, Flavio Zanonato and his Minister of the Environment, Andrea Orlando, announced the receivership of Ilva, one of the largest steel makers in Europe, for a duration of 36 months, appointing Enrico Bondi as receiver.[72] + +On 15 June, the government approved the so-called "Action Decree" on hiring policies enabling economic recovery.[73] The decree was later approved by the Parliament between July and August 2013 with a confidence vote. The reform was harshly criticized by the anti-establishment Five Star Movement.[74] On 29 August, the government abolished IMU, the Italian tax on real estate introduced by the technocratic government of Mario Monti, for primary homes and for farm buildings .[75] + +Immigration policies +See also: Operation Mare Nostrum +As a result of the Libyan and Syrian Civil Wars, a major problem faced by Letta upon becoming prime minister in 2013 was the high levels of illegal immigration to Italy.[76] + +On 3 October 2013, a boat carrying migrants from Libya to Italy sank off the Italian island of Lampedusa. It was reported that the boat had sailed from Misrata, Libya, but that many of the migrants were originally from Eritrea, Somalia and Ghana.[77][78][79] An emergency response involving the Italian Coast Guard resulted in the rescue of 155 survivors.[78] On 12 October it was reported that the confirmed death toll after searching the boat was 359, but that further bodies were still missing;[80] a figure of "more than 360" deaths was later reported, becoming the deadliest shipwreck occurred in the Mediterranean Sea.[81] + +After the Lampedusa tragedy, Prime Minister Letta decided to strengthen the national patrolling of Sicilian channel by authorizing Operation Mare Nostrum, a military and humanitarian operation whose purpose was to patrol the maritime border and provide relief to migrants. This operation had two main purposes: to safeguard life at sea and to combat the illegal smuggling of migrants.[82] The operation brought at least 150,000 migrants to Europe, mainly from Africa and the Middle East.[83] The operation ended a few months after the end of his premiership, on 31 October 2014.[84] + +Foreign policies + +Letta with the U.S. President Barack Obama in the Oval Office +A strong pro-Europeanist politician, Letta built up close relations with other prominent European leaders like Angela Merkel, who was the first foreign leader he met, just a few days after his sworn in, on 30 April.[85] Letta also built a warm relationship with the French President François Hollande, with whom he shared a common view on austerity policies, considered outdated to face the economic crisis; Letta and Hollande often stressed the necessity to increase the public expenditures in investments.[86] + +On 17 and 18 June, Letta participated in his first G8 summit at Lough Erne in Northern Ireland.[87] During the summit, Letta had his first bilateral meeting with the President of the United States, Barack Obama. On 17 October, Letta was invited to the White House by President Obama, who stated that he had been really impressed by the Italian Prime Minister and his reforms plan.[88] + +On 5 and 6 September, Letta took part in the G20 summit in Saint Petersburg. The summit was focused on the aftermath of the Syrian civil war. Letta advocated for a diplomatic resolution of the crisis promoted by the United Nations.[89] On 25 September, during his speech in front of the United Nations General Assembly, Letta asked a deep reform of the UN Security Council.[90] + +September 2013 government crisis +On 28 September 2013, five ministers of The People of Freedom resigned on the orders of their leader, Silvio Berlusconi, pointing to the decision to postpone the decree that prevented the increase of the VAT from 21 to 22%, thus opening a government crisis.[91] On the following day, Letta had a meeting with President Napolitano to discuss the possible alternatives to solve the crisis. The head of State stressed that he would dissolve parliament only if there were no other possible alternatives.[92] + + +Letta with Angelino Alfano and Giorgio Napolitano in December 2013 +In the following days, dozens of members of PdL prepared to defy Berlusconi and vote in favour of the government, prompting him to announce that he would back the Prime Minister.[93][94][95] On 2 October, the government received 235 votes in favor and 70 against in the Senate, and 435 in favor and 162 against in the Chamber of Deputies.[96][97] Letta could thus continue his grand coalition government.[98] + +On 23 November, the Senate had to vote about the expulsion of Berlusconi from the Parliament, due to a conviction of tax fraud by the court of final instance and the Court of Cassation, which occurred a few months before.[99] Because he had been sentenced to a gross imprisonment for more than two years, the Senate voted to expel him from the Parliament, barring him from serving in any legislative office for six years.[100][101] + +After his expulsion from the Parliament, Berlusconi, who disbanded the PdL a few days before re-founding Forza Italia party, withdrew his support to the government. However, the interior minister Angelino Alfano did not follow his former leader, founding, along with other ministers and many members of the parliament, the New Centre-Right party, remaining in government.[102] The government later won key confidence votes in December 2013, with 173 votes in favour in the Senate and 350 in the Chamber.[103] + +On 26 January 2014, the Minister of Agriculture, Nunzia De Girolamo, resigned from her post due to claims of improper conduct linked to a scandal in the local healthcare system of her hometown, Benevento.[104][105] Her resignation was accepted by Letta on the following day, who took the ministerial role ad interim.[106] + +Resignation +On 8 December 2013, the Mayor of Florence, Matteo Renzi, won the Democratic Party leadership election by a landslide, immediately starting rumours about the possibility of becoming the new prime minister.[107] On 17 January 2014, while on air at Le invasioni barbariche on La7 TV channel, interviewed about tensions between him and Prime Minister Letta, Renzi tweeted the hashtag #enricostaisereno ("Enrico don't worry") to reassure his party colleague that he was not plotting anything against him.[108] + + +Letta with Matteo Renzi and President Napolitano in October 2013 +The growing criticism of the slow pace of Italian economic reform left Letta increasingly isolated within his own party.[109] At a PD's meeting on 13 February 2014, the Democratic Party leadership voted heavily in favour of Renzi's motion for "a new government, a new phase and a radical programme of reforms". Minutes after the party backed Renzi's proposal by 136 votes to 16, with two abstentions, Letta went to the Quirinal Palace, for a bilateral meeting with President Napolitano.[11] + +In an earlier speech, Renzi had paid tribute to Letta, saying that he did not intend to put him "on trial". But, without directly proposing himself as the next prime minister, he said the Eurozone's third-largest economy urgently needed "a new phase" and "radical programme" to push through badly-needed reforms. The motion he put forward made clear "the necessity and urgency of opening a new phase with a new executive". Speaking privately to party leaders, Renzi said that Italy was "at a crossroads" and faced either holding fresh elections or a new government without a return to the polls.[110] + +On 14 February, Letta resigned from the office of prime minister.[111] Following Letta's resignation, Renzi received the task of forming a new government from President Napolitano on 17 February,[112] and was formally sworn in as prime minister on 22 February.[113] + +Academic career + +Letta speaking at the Jacques Delors Institute in 2016 +In 2015, Letta resigned as a member of the Chamber of Deputies, after having voted against the new electoral law proposed by Prime Minister Renzi; at the same time, he announced that he would not renew the PD's membership.[114] + +In April 2015, Letta moved to Paris to teach at the Sciences Po, a higher education institute of political science. Since 1 September, he became dean of the Paris School of International Affairs (PSIA) of the same institute.[115] Along with his commitment to Sciences Po, he also had teaching periods at the University of Technology Sydney and the School of Global Policy and Strategy at the University of California, San Diego. In the same year, Letta launched Scuola di Politiche (School of Politics), a course of political science for young Italians.[116] + +In 2016, Letta supported the constitutional reform proposed by Renzi to reduce the powers of the Senate.[117] In the same year, along with the Jacques Delors Institute, he launched a school of political science focused on European issues, known as Académie Notre Europe.[118] In October 2017, he joined the new Comitè Action Publique 2022, a public commission for the reform of state and public administration in France which was strongly supported by President Emmanuel Macron.[119] + + +Letta with François Hollande and Jean-Claude Juncker in 2016 +In March 2019, following the victory of Nicola Zingaretti in the PD leadership election, Letta announced that he would re-join the party after four years.[120] In the same year, Letta also served on the advisory board of the annual Human Development Report of the United Nations Development Programme (UNDP), co-chaired by Thomas Piketty and Tharman Shanmugaratnam.[121] In 2020, he spoke in favour of the constitutional reform to reduce the number of MPs, considering it the first step to overcome perfect bicameralism.[122] + +Following his retirement from politics, Letta became advisor of many corporations and international organizations like Abertis, where he became member of the Board of Directors in 2016,[123][124] Amundi, in which he served as member of the Global Advisory Board since 2016,[125] the Eurasia Group, of which he has been Senior Advisor since 2016,[126] Publicis, where he served within the International Advisory Board since 2019[127] and Tikehau Capital, of which he became member of the International Advisory Board.[128] + +Letta is a member of many no-profit organizations like the International Gender Champions (IGC),[129] the British Council, Re-Imagine Europa,[130] the Trilateral Commission, in which he presided the European Group,[131] the Aspen Institute Italia, in which he served in the Executive Committee,[132] Associazione Italia ASEAN, of which he became chairman[133] and the Institut de Prospective Economique du Monde Méditerranéen (IPEMED).[134]. + +Letta was appointed Dean of IE School of Politics, Economics and Global Affairs. Letta will replace Manuel Muñiz, the current Provost of IE University and Charmain of the Board of IE New York College. He will join IE University on November 20.[135] + +Secretary of the Democratic Party + +Letta speaking at the European Parliament during the memorial for David Sassoli, in January 2022 +In January 2021, after the government crisis which forced Prime Minister Giuseppe Conte to resign, a national unity government led by Mario Draghi was formed.[136] In the midst of the formation of Draghi's government, Zingaretti was heavily criticized by the party's minority for his management of the crisis and strenuous support to Conte. On 4 March, after weeks of internal turmoil, Zingaretti announced his resignation as secretary, stating that he was "ashamed of the power struggles" within the party.[137] + +In the next days, many prominent members of the PD, including Zingaretti himself, but also former prime minister Paolo Gentiloni, former party secretary Dario Franceschini and President of Emilia-Romagna Stefano Bonaccini, publicly asked former Letta to become the new leader of the party.[138][139] Following an initial reluctance, Letta stated that he needed a few days to evaluate the option.[140] On 12 March, he officially accepted his candidacy as new party's leader.[141][142] On 14 March, the national assembly of the PD elected Letta secretary with 860 votes in favour, 2 against and 4 abstentions.[143][144] + +On 17 March, Letta appointed Peppe Provenzano and Irene Tinagli as his deputy secretaries.[145] On the following day, he appointed the party's new executive, composed of eight men and eight women.[146] Later that month, Letta forced the two Democratic leaders in Parliament, Graziano Delrio and Andrea Marcucci, to resign and proposed the election of two female leaders.[147] On 25 and 30 March, senators and deputies elected Simona Malpezzi and Debora Serracchiani as their leaders in the Senate and in the Chamber.[148][149] + + +Letta with Giuseppe Conte and the Finnish PM Sanna Marin in 2022 +In July 2021, Letta announced his intention to run for the Chamber of Deputies in the Siena constituency, which remained vacant after the resignation of Pier Carlo Padoan. On 4 October, Letta won the by-election with 49.9% of votes, returning to the Parliament after six years.[150] In the concurrent local elections, the PD and its allies won municipal elections in Milan, Bologna, Naples, Rome, Turin and many other major cities across the country.[151] + +As leader of the third political force in the parliament, Letta played an important role in the re-election of incumbent president Sergio Mattarella. On 23 January 2022, during Fabio Fazio's talk show Che tempo che fa, Letta stated that his favourable candidates for the presidency were Mario Draghi and Sergio Mattarella.[152] On the morning of 29 January, after the fall of all other possible candidacies, Letta asked the other leaders to follow "the Parliament's wisdom", referring to the massive support that Mattarella had received in the previous ballots.[153] On the same day, all the main parties asked Mattarella to serve for a second term. Despite his initial firm denial, Mattarella accepted the nomination[154] and was re-elected with 759 votes.[155] + +In July 2022, tensions arose within the governing majority, especially between Giuseppe Conte, leader of the Five Star Movement, and Prime Minister Draghi. Letta, who was trying to form a broad centre-left coalition with the M5S in the following election, was particularly critical of the possibility of a government crisis.[156] On 13 July, Conte announced that the M5S would revoke its support to the national unity government regarding the so-called decreto aiuti (English: aid decree), concerning economic stimulus to contrast the ongoing energy crisis, opening a political crisis within the majority.[157] On the following day, the M5S abstained and Prime Minister Draghi, despite having won the confidence vote, resigned.[158] However, the resignation was rejected by President Mattarella.[159] On the same day, Letta stressed that a government crisis needed to be officially opened in the Parliament, adding that "Italy deserved to stand with a strong personality like that of PM Draghi and the team that was around him."[160] However, on 21 July, Draghi resigned again after a new confidence vote in the Senate failed to pass with an absolute majority, following the defections of M5S, Lega, and Forza Italia;[161][162] A snap election was called for 25 September 2022.[163] + +After the 2022 general election, Enrico Letta conceded defeat and announced that he would not stand at the congress to elect the new party secretary.[164][165][166][167] He was succeeded by Elly Schlein, following the election on 26 February 2023.[168] + +Personal life +Letta is married to Gianna Fregonara, an Italian journalist, with whom he had three children, Giacomo, Lorenzo and Francesco.[169] + +Letta is known to be fond of listening to Dire Straits and playing Subbuteo;[170] he is also an avid supporter of A.C. Milan.[171] In addition to his native Italian, Letta speaks French and English fluently.[29] + +Electoral history +Election House Constituency Party Votes Result +2001 Chamber of Deputies Piedmont 1 DL –[a] check Elected +2004 European Parliament North-East Italy Ulivo 178,707 check Elected +2006 Chamber of Deputies Lombardy 1 Ulivo –[a] check Elected +2008 Chamber of Deputies Lombardy 2 PD –[a] check Elected +2013 Chamber of Deputies Marche PD –[a] check Elected +2021 Chamber of Deputies Siena PD 33,391 check Elected +2022 Chamber of Deputies Lombardy 1 PD –[a] check Elected + Elected in a closed list proportional representation system. +First-past-the-post elections +2021 Italian by-election (C): Siena +Candidate Party Votes % +Enrico Letta Centre-left coalition 33,391 49.9 +Tommaso Marrocchesi Marzi Centre-right coalition 25,303 37.8 +Others 8,191 12.3 +Total 66,885 100.0 +References + Quirinale, il governo di Letta giura davanti a Napolitano, Il Fatto Quotidiano + Letta eletto segretario: "Serve un nuovo Pd aperto, non partito del potere", Sky Tg24 + Enrico Letta, Enciclopedia Treccani + Italian Parliament Website LETTA Enrico – PD Retrieved 24 April 2013 + Nuovo governo, incarico a Enrico Letta. Napolitano: "I media cooperino", Il Fatto Quotidiano + "Letta: Grande coalizione, bisogna farsene una ragione". Archived from the original on 8 October 2016. Retrieved 28 January 2019. + Tre canali di finanziamento, più trasparenza. Ecco punto per punto il ddl del governo, Corriere della Sera + Vertice lavoro, Letta ai ministri europei: «Non c'è più tempo, si deve agire subito Scelta sciagurata guardare solo i conti» – Il Messaggero Archived 16 June 2013 at the Wayback Machine. Ilmessaggero.it. Retrieved on 24 August 2013. + Letta: all'Italia 1,5 miliardi per il lavoro. Grillo «poteva mandare tutto in vacca», Corriere della Sera + Letta: perché difendo Mare Nostrum, Avvenire + "Letta al Quirinale, si è dimesso – Top News". Retrieved 12 July 2016. + Enrico Letta, Sciences Po + Pd, Zingaretti si dimette. Dice addio il decimo segretario in 14 anni, Il Sole 24 Ore + Letta, il giorno della scelta. Zingaretti: rilancerà il Pd, il manifesto + Letta: "Non vi serve un nuovo segretario, ma un nuovo Pd", Huffington Post + Elezioni suppletive Siena: vince Letta, La Stampa + https://www.ansa.it/sito/notizie/politica/2024/12/20/in-aula-alla-camera-si-votano-le-dimissioni-di-enrico-letta_7a395834-ba1c-4567-bcfe-b3e3f499f045.html + Popova, Maria (1 October 2024). "Enrico Letta, new dean of the Faculty of Politics and Economics of the IE University of Segovia". Top Buzz Times. Retrieved 2 October 2024. + Motta, Nino (2 February 2013). "Un Letta per ogni stagione". Il Centro. + "Gli zii di Enrico Letta. Non solo Gianni: c'è anche Gian Paolo Bazzoni a Porto Torres". Sardinia Post. 25 April 2013. Retrieved 2 June 2013. + Winfield, Nicole (24 April 2013). "Enrico Letta Appointed Italian Prime Minister, Asked To Form Government". The Huffington Post. Retrieved 4 May 2013. + Letta, Enrico (2013). "Curriculum Vitae" (PDF). Archived from the original (PDF) on 11 June 2013. Retrieved 3 June 2013. + "Enrico Letta: la bio del giovane dalla grande esperienza". Huffington Post (in Italian). 24 August 2013. Retrieved 3 June 2013. + "Su esecutivo marchio scuola Sant'Anna: Pisa Letta si e' specializzato, Carrozza e' stato rettore" (in Italian). ANSA. 27 April 2013. Retrieved 11 June 2013. + Governo. Enrico Letta, l’allievo di Andreatta diventa presidente del Consiglio, Il Giornale + Enrico Marro (24 April 2013). "Chi è Enrico Letta? Quel giovane cattolico moderato, con agganci in tutto il Transatlantico. Nipote di Gianni. E fan di Mandela". Il Sole-24 Ore (in Italian). Milan. Retrieved 6 December 2022. + "Profile: Enrico Letta". BBC News. 24 April 2013. Retrieved 3 June 2013. + Povoledo, Elisabetta (28 April 2013). "An Italian Leader and a Political Acrobat". The New York Times. Retrieved 3 June 2013. + Dinmore, Guy (24 April 2013). "Italy's Enrico Letta a party loyalist and bridge-builder". Financial Times. + Sachelli, Orlando (24 April 2013). "Enrico Letta, il giovane Dc che deve far da paciere tra Pd e Pdl". Il Giornale (in Italian). Retrieved 7 June 2013. + Enrico Letta, Il Sole 24 Ore + DDL presentati dal Ministero del commercio con l'estero (Governo Amato-II), Parlamento + "Pisano, milanista, baby-ministro. Ecco chi è Enrico Letta, l'eterno "giovane" del Pd". Libero (in Italian). 24 April 2013. + Enrico Letta, Biografie Online + Elezioni Europee del 2004. Circoscrizione Italia Nord-Orientale, Dipartimento per gli Affari Interni + "European Parliament Website". European Parliament. Retrieved 7 May 2013. + Members of the European Parliament, European Parliament + "Mastella to Drop Support for Prodi, Favors Elections", Bloomberg, 21 January 2008. + "Italy PM in cabinet crisis talks", BBC News, 21 January 2008. + Vespa, Bruno (2010). Il Cuore e la Spada: Storia politica e romantica dell'Italia unita, 1861–2011. Mondadori. p. 650. ISBN 9788852017285. + Augusto, Giuliano (8 December 2013), "De profundis per il Pd", Rinascita, archived from the original on 1 March 2014 + Pd: Letta: «Mi candido». Video sul web, Corriere della Sera + Letta leader? Senza grinta, Il Fatto Quotidiano + "Associazione 360". Archived from the original on 20 June 2008. Retrieved 3 July 2008. + "Noi". Associazione Trecento Sessanta. Archived from the original on 14 March 2010. Retrieved 10 March 2010. + De Castro: "Candidatura di Letta grande occasione per coinvolgere la società Archived 27 September 2007 at the Wayback Machine, Enrico Letta + "DemocraticiPerLetta.info – Home". Archived from the original on 6 October 2008. Retrieved 3 July 2008. + "cantieredemocratico.it – - Notizie: Pd: per Veltroni tre liste nazionali". Archived from the original on 7 January 2009. Retrieved 3 July 2008. + "Rome Mayor Set to Win Left's Leadership". Associated Press. 14 October 2007. Retrieved 15 October 2007.[permanent dead link] + "Veltroni stravince con il 76% ma è la festa dei cittadini elettori". la Repubblica (in Italian). 14 October 2007. + Partito Democratico Archived 2008-04-30 at the Wayback Machine + "Pd, Bersani indica la rotta "Noi, partito dell'alternativa"". Quotidiano.net (in Italian). 9 September 2009. Retrieved 26 April 2013. + "Il Pd "sale" al Nord. E dialoga con Maroni". Archiviostorico.corriere.it. Retrieved 17 July 2014. + "Letta accoglie Maroni "Con la Lega si deve parlare"". Archiviostorico.corriere.it. Retrieved 17 July 2014. + "L' elettore del Nord: il Pdl? Meglio allearsi con la Lega". Archiviostorico.corriere.it. Retrieved 17 July 2014. + "Bossi loda Letta: giusto il dialogo sa che con noi si vince alle urne". Archiviostorico.corriere.it. Retrieved 17 July 2014. + "Italian election results: gridlock likely – as it happened". The Guardian. 26 February 2013. Retrieved 27 February 2013. + "Italy struggles with 'nightmare' election result". BBC News. 26 February 2013. Retrieved 27 February 2013. + "Italy crisis: President Giorgio Napolitano re-elected". BBC News. 20 April 2013. Retrieved 20 April 2013. + Mackenzie, James (20 April 2013). "Giorgio Napolitano, Italy's reluctant president". Bloomberg L.P. Retrieved 21 April 2013. + Napolitano, Giorgio; Scalfari, Eugenio (9 June 2013). "Napolitano si racconta a Scalfari: 'La mia vita, da comunista a Presidente'" (Video, at 59 min). La Repubblica (in Italian). Retrieved 9 June 2013. + The critical findings on electoral law echoed in the words that the head of state gave 22 April 2013 before the Electoral College that had re-elected him for a second term: Buonomo, Giampiero (2013). "Porcellum, premio di maggioranza a rischio". Golem Informazione. Archived from the original on 11 December 2019. Retrieved 11 March 2021. + Frye, Andrew (24 April 2013). "Letta Named Italian Prime Minister as Impasse Ends". Bloomberg. Retrieved 26 April 2013. + "Bridge-builder Enrico Letta seals Silvio Berlusconi deal". The Australian. 29 April 2013. Retrieved 8 June 2013. + Nasce il governo Letta, ora la fiducia. Il premier: «Sobria soddisfazione», Corriere della Sera + "New Italian 'grand coalition' government sworn in". BBC News. 28 April 2013. Retrieved 28 April 2013. + Sparatoria Palazzo Chigi: due carabinieri feriti. L’attentatore: "Puntavo ai politici", Il Fatto Quotidiano + Letta: «Abbiamo un'ultima possibilità. Basta debiti scaricati sulle future generazioni», Corriere della Sera + Governo Letta, fiducia anche al Senato, Corriere della Sera + Governo Letta, fiducia alla Camera: 453 sì, 153 no. Si astiene la Lega, Il Fatto Quotidiano + Disoccupazione giovanile, Bce: "Nel 2013 in Italia è arrivata vicina al 40%", Il Fatto Quotidiano + Ilva, firmato il decreto: Enrico Bondi commissario per 36 mesi, Il Fatto Quotidiano + Il Decreto del fare, misura per misura – Europa Quotidiano Archived 19 June 2013 at the Wayback Machine. Europaquotidiano.it (16 June 2013). Retrieved on 24 August 2013. + La Camera approva il «decreto del fare», Corriere della Sera + Abolizione IMU 2013, ecco cosa cambia per "prima casa", edilizia e terreni agricoli, EdilTecnico + Letta da Malta: " Orgoglio per l'operazione Mare Nostrum", Rai News + Pianigiani, Gaia (3 October 2013). "Scores of Migrants Dead After Boat Sinks Off Sicily". The New York Times. Siracusa. Retrieved 3 October 2013. + "Dozens of migrants die in Italy boat sinking near Lampedusa". BBC News. 3 October 2013. Retrieved 3 October 2013. + "Witness: Boat migrants used bottles to stay afloat". USA Today. 4 October 2013. Retrieved 4 October 2013. + "Mediterranean 'a cemetery' – Maltese PM Muscat". BBC News. 12 October 2013. Retrieved 12 October 2013. + "Lampedusa boat tragedy: Migrants 'raped and tortured'". BBC News. 8 November 2013. Retrieved 8 November 2013. + "Mare Nostrum Operation". Ministry of Defence of Italy. Retrieved 16 April 2015. + "IOM Applauds Italy's Life-Saving Mare Nostrum Operation: "Not a Migrant Pull Factor"". International Organization for Migration. 31 October 2014. Archived from the original on 16 April 2015. Retrieved 16 April 2015. + Ella Ide (31 October 2014). "Italy ignores pleas, ends boat migrant rescue operation". Yahoo! News. Retrieved 16 April 2015. + Letta, tour in Europa: vertice con Merkel. La cancelliera: «Italia sulla buona strada», Il Fatto Quotidiano + Ue, asse Letta-Hollande per la crescita, Corriere della Sera + G8, il debutto di Enrico Letta Prima l'incontro con Obama L'incognita Siria divide già – Quotidiano Net. Quotidiano.net. Retrieved on 24 August 2013. + Usa, Obama riceve Letta: "Italia sulla strada giusta, impressionato da premier", la Repubblica + Siria, Enrico Letta: "Una soluzione politica con l'Onu è ancora possibile. Strada stretta, ma fondamentale", Huffington Post + Letta a Wall Street: "Siamo affidabili". E all'Onu chiede riforma Consiglio sicurezza, la Repubblica + "Berlusconi fa dimettere ministri: è crisi. Letta: gesto folle per motivi personali". Repubblica.it. 28 September 2013. Retrieved 13 February 2014. + "Napolitano: "Verifico possibilità legislatura". Caos nel Pdl. Alfano: "No a estremismi"". Repubblica.it. 29 September 2013. Retrieved 13 February 2014. + Berlusconi U-turn secures Italian government survival + "Italian PM wins confidence vote after Berlusconi abandons revolt - as it happens". The Guardian. 2 October 2013. Archived from the original on 27 March 2023. + Italy crisis: PM Letta wins vote after Berlusconi U-turn + "Irrevocabili dimissioni ministri Pdl – Politica". ANSA.it. 28 September 2013. Retrieved 13 February 2014. + "Letta mercoledì a Camera e Senato – Politica". ANSA.it. 29 September 2013. Retrieved 13 February 2014. + "Berlusconi si arrende, Letta ottiene fiducia Napolitano: "Ora basta giochi al massacro"". Repubblica.it. 16 November 2013. Retrieved 13 February 2014. + Parks, Tim (24 August 2013). "Holding Italy Hostage". The New York Review of Books. Archived from the original on 25 October 2013. Retrieved 6 September 2013. + Italy's Senate expels ex-PM Silvio Berlusconi, BBC, 27 November 2013. Archived 30 November 2013 at the Wayback Machine + "Berlusconi vows to stay in politics as ban approaches". Reuters. 18 September 2013. Archived from the original on 14 October 2013. Retrieved 18 September 2013. + james mackenzie (3 December 2013). "Italy PM Letta to seek new confidence vote on December 11". The Star. Malaysia. Archived from the original on 7 December 2013. Retrieved 13 February 2014. + Letta incassa la fiducia, ma è bagarre in aula. E la Lega perde un pezzo, la Repubblica + James MacKenzie (26 January 2014). "Italy minister resigns, adding to headaches for government". Reuters. Rome. Retrieved 29 January 2014. + "Italy's agriculture minister resigns, blow to govt". Seattle Pi. 26 January 2014. Retrieved 29 January 2014. + "Premier accepts agriculture minister's resignation". La Gazzetta del Mezzogiorno. 27 January 2014. Archived from the original on 2 July 2015. Retrieved 29 January 2014. + Primarie PD 2013, Partito Democratico + Renzi: quando assicurava di non voler prendere il posto di Letta, Corriere della Sera + "Napolitano accepts Letta's resignation as Italian prime minister". Euronews. 14 February 2014. Archived from the original on 14 February 2014. Retrieved 14 February 2014. + Lizzy Davies in Rome. "Italian PM Enrico Letta to resign". The Guardian. Retrieved 13 February 2014. + Правительственный кризис в Италии: премьер Летта ушел в отставку (in Russian). RIA Novosti. 14 February 2014. Retrieved 14 February 2014. + "39 Year Old Matteo Renzi becomes, at 39, Youngest Italian Prime Minister". IANS. news.biharprabha.com. Retrieved 17 February 2014. + "Matteo Renzi sworn in as Italy's new PM in Rome ceremony". BBC. 22 February 2014. Retrieved 26 February 2014. + Enrico Letta si dimette da deputato: il discorso in aula e il lungo applauso della Camera, Huffington Post + "Enrico Letta, New Dean of PSIA". SciencesPo News. 21 April 2014. Retrieved 10 March 2017. + "Scuola di Politiche", Scuoladipolitiche.eu. Retrieved 4 February 2022. + Letta: «Italicum legge sbagliata. Ma al referendum io voterò Sì», Corriere della Sera + Letta battezza Académie Notre Europe: "Per creare una classe dirigente europea ed europeista", Huffington Post + Macron chiama Letta a far parte della Commissione per la riforma dello Stato, Il Giornale + Enrico Letta: "Dopo 5 anni riprendo la tessera del Pd. Mai più partito dell’antipatia", la Repubblica + 2019 Human Development Report Advisory Board Members United Nations Development Programme (UNDP). + Referendum, Letta: "Voterò Sì convintamente. Tutte le nostre proposte di riforma prevedevano lo stesso taglio. 630 deputati? Ne bastano 400", Il Fatto Quotidiano + "Abertis' Board of Directors appoints Luis Fortuño and Enrico Letta as new directors". Abertis.com. Archived from the original on 16 August 2018. Retrieved 16 August 2018. + Polizzi, Daniela. "Ai Benetton le autostrade spagnole Accordo Atlantia-Hochtief su Abertis". Corriere della Sera (in Italian). Retrieved 16 August 2018. + Amundi creates a Global Advisory Board with world-renowned experts in global economic and political issues Amundi, press release of 31 May 2016. + Former Italian Prime Minister Enrico Letta joins Eurasia Group as Senior Advisor Eurasia Group, press release of 8 March 2016. + Supervisory Board Publicis, press release of 7 March 2019. + International Advisory Board Archived 4 January 2021 at the Wayback Machine Tikehau Capital. + Members International Gender Champions (IGC). + Advisory Board Re-Imagine Europa. + Membership Trilateral Commission. + "Comitato Esecutivo". Aspen Institute Italia. Archived from the original on 9 October 2010. Retrieved 26 April 2013. + About Us Archived 25 November 2018 at the Wayback Machine Associazione Italia ASEAN. + Governance Institut de Prospective Economique du Monde Méditerranéen (IPEMED), Paris. + https://www.ie.edu/school-politics-economics-global-affairs/news/enrico-letta-former-italian-prime-minister-appointed-dean-ie-school-politics-economics-global-affairs/ + "Mario Draghi sworn in as Italy's new prime minister". BBC News. 13 February 2021. + "Zingaretti quits as chief of Italy's Democratic party over infighting". Financial Times. 4 March 2021. Archived from the original on 7 March 2021. Retrieved 12 March 2021. + "Zingaretti: "Letta può rendere il Pd protagonista indiscusso della democrazia italiana"" (in Italian). Il Foglio. 12 March 2021. + ""Dobbiamo salvare il Pd". Così Franceschini lavora per Letta" (in Italian). Il Foglio. 9 March 2021. + "Letta takes time to consider taking lead of PD – English". ANSA.it. 10 March 2021. Retrieved 10 March 2021. + "Pd, Letta sarà il nuovo segretario. Il tweet: "Io ci sono, chiedo voto sulla base delle mie parole". Ecco il programma dell'Assemblea di domenica" (in Italian). La Repubblica. 12 March 2021. + "Enrico Letta, Italian ex-PM, poised for political comeback". Politico Europe. 12 March 2021. + Pd, Letta segretario con 860 sì: "Serve un nuovo Pd. Priorità a lavoro, giovani e donne". Promette battaglia sul voto ai sedicenni e Ius soli. E sulle alleanze: "Sentirò 5S e Renzi", la Repubblica + First speech as candidate secretary of the Italian Partito Democratico (in Italian). Archived from the original on 13 December 2021. + Provenzano e Tinagli, il cacciavite di Letta funziona, Huffington Post + Pd, Letta nomina la nuova segreteria del partito: sedici membri, otto uomini e otto donne, la Repubblica + Pd, Letta: "Nominiamo due donne capigruppo alla Camera e al Senato". Delrio: "Agito sempre per parità", la Repubblica + Pd, Simona Malpezzi è la nuova capogruppo al Senato. E alla Camera vacilla l'ipotesi Serracchiani, la Repubblica + Debora Serracchiani capogruppo Pd alla Camera, ANSA + Letta vince a Siena le suppletive, ANSA + Risultati ballottaggi del 17 e 18 ottobre. A Roma e Torino trionfa il centrosinistra. A Trieste vince il centrodestra, la Repubblica + Quirinale, la proposta di Letta: "Draghi o Mattarella, il bis sarebbe il massimo", la Repubblica + "L'assist di Letta la Mattarella-bis". Corriere della Sera (in Italian). 29 January 2022. Retrieved 29 January 2022. + "Mattarella to be re-elected after saying he is 'willing'". ANSA. 29 January 2022. Archived from the original on 29 January 2022. Retrieved 30 January 2022. + "Elezioni Presidente della repubblica 2022". La Repubblica (in Italian). 29 January 2022. Archived from the original on 29 January 2022. Retrieved 28 January 2022. + Letta: "Evitiamo il colpo di pistola di Sarajevo, se cade il governo si va al voto", Huffington Post + "Italy's government on the brink as 5-Star threatens to boycott confidence vote". Guardian. 13 July 2022. Retrieved 13 July 2022. + Italian Prime Minister Mario Draghi says he’ll resign, government faces collapse, Washington Post + Mattarella respinge dimissioni Draghi e manda premier a Camere, ANSA + Governo in bilico, Letta: "La crisi si apre in Parlamento". Anche la Lega valuta la verifica di maggioranza: cosa significa, la Repubblica + Horowitz, Jason (20 July 2022). "Draghi Government Falls Apart, Returning Turbulent Politics to Italy". The New York Times. ISSN 0362-4331. Archived from the original on 21 July 2022. Retrieved 21 July 2022. + "Italy in limbo as Draghi wins confidence vote but loses parliamentary majority". France 24. Agence-France Press. 20 July 2022. Archived from the original on 20 July 2022. Retrieved 21 July 2022. + Borghese, Livia; Braithwaite, Sharon; Fox, Kara; Latza Nadeau, Barbie; Ruotolo, Nicola (21 July 2022). "Italy's president dissolves parliament, triggering snap election following Draghi's resignation". CNN. Archived from the original on 21 July 2022. Retrieved 22 July 2022. + "«Letta si dimette sotto questa percentuale». E i big già lo archiviano: è caccia al nuovo segretario". 6 September 2022. + "Come una mucca nel corridoio. Il congresso Pd si piazza sul palco di Letta". 23 September 2022. + "Letta pensa alle elezioni, ma il Pd pensa al congresso". + "Pd: Letta, giovedì 6 ottobre direzione sul congresso". 28 September 2022. + Nova, Redazione Agenzia (26 February 2023). "Elly Schlein è la nuova segretaria del Partito democratico". Agenzia Nova (in Italian). Retrieved 26 February 2023. + "Enrico Letta Profile: Mild-Mannered AC Milan Fan who is Italy's Next PM". International Business Times. 24 April 2013. Retrieved 30 April 2013. + Kington, Tom (24 April 2013). "Enrico Letta to become youngest Italian prime minister in 25 years". The Daily Telegraph. Retrieved 4 May 2013. + Tra la passione per la politica, l'Ue e il Milan, chi è Enrico Letta, AGI – Agenzia Italiana +Notes + + It is not altogether clear whether the Doctorate degree was obtained in international law in 1997 as reported in his curriculum vitae,[22] or in political science in 1999 as reported by ANSA.[24] +External links + +Wikimedia Commons has media related to Enrico Letta. +Personal profile of Enrico Letta in the European Parliament's database of members +Declaration (PDF) of financial interests (in Italian) +Political offices +Preceded by +Lamberto Dini +Minister for the Community Policies +1998–1999 Succeeded by +Patrizia Toia +Preceded by +Pier Luigi Bersani +Minister of Industry, Commerce and Crafts +1999–2001 Succeeded by +Antonio Marzano +as Minister of Productive Activities +Preceded by +Gianni Letta +Secretary of the Council of Ministers +2006–2008 Succeeded by +Gianni Letta +Preceded by +Mario Monti +Prime Minister of Italy +2013–2014 Succeeded by +Matteo Renzi +Party political offices +Preceded by +Dario Franceschini +Deputy Secretary of the Democratic Party +2009–2013 Succeeded by +Debora Serracchiani +Succeeded by +Lorenzo Guerini +Preceded by +Nicola Zingaretti +Secretary of the Democratic Party +2021–2023 Succeeded by +Elly Schlein +Enrico Letta +Authority control databases Edit this at Wikidata +Categories: 1966 birthsLiving peoplePeople from PisaItalian Roman CatholicsChristian Democracy (Italy) politiciansItalian People's Party (1994) politiciansDemocracy is Freedom – The Daisy politiciansPrime ministers of ItalyGovernment ministers of ItalyMinisters of agriculture of ItalyDeputies of Legislature XIV of ItalyDeputies of Legislature XV of ItalyDeputies of Legislature XVI of ItalyDeputies of Legislature XVII of ItalyLetta CabinetDemocratic Party (Italy) MEPsMEPs for Italy 2004–2009University of Pisa alumniSant'Anna School of Advanced Studies alumniLeaders of political parties in Italy \ No newline at end of file diff --git a/tests/test_managers.py b/tests/test_managers.py index 6331e266..7ed9eaa5 100644 --- a/tests/test_managers.py +++ b/tests/test_managers.py @@ -28,6 +28,7 @@ from letta.constants import ( BASE_VOICE_SLEEPTIME_CHAT_TOOLS, BASE_VOICE_SLEEPTIME_TOOLS, BUILTIN_TOOLS, + FILES_TOOLS, LETTA_TOOL_EXECUTION_DIR, LETTA_TOOL_SET, MCP_TOOL_TAG_NAME_PREFIX, @@ -2535,6 +2536,8 @@ async def test_upsert_base_tools(server: SyncServer, default_user, event_loop): assert t.tool_type == ToolType.LETTA_VOICE_SLEEPTIME_CORE elif t.name in BUILTIN_TOOLS: assert t.tool_type == ToolType.LETTA_BUILTIN + elif t.name in FILES_TOOLS: + assert t.tool_type == ToolType.LETTA_FILES_CORE else: pytest.fail(f"The tool name is unrecognized as a base tool: {t.name}") assert t.source_code is None @@ -6007,7 +6010,7 @@ async def test_attach_is_idempotent(server, default_user, sarah_agent, default_f @pytest.mark.asyncio async def test_update_file_agent(server, file_attachment, default_user): - updated = await server.file_agent_manager.update_file_agent( + updated = await server.file_agent_manager.update_file_agent_by_id( agent_id=file_attachment.agent_id, file_id=file_attachment.file_id, actor=default_user, @@ -6018,6 +6021,19 @@ async def test_update_file_agent(server, file_attachment, default_user): assert updated.visible_content == "updated" +@pytest.mark.asyncio +async def test_update_file_agent_by_file_name(server, file_attachment, default_user): + updated = await server.file_agent_manager.update_file_agent_by_name( + agent_id=file_attachment.agent_id, + file_name=file_attachment.file_name, + actor=default_user, + is_open=False, + visible_content="updated", + ) + assert updated.is_open is False + assert updated.visible_content == "updated" + + @pytest.mark.asyncio async def test_mark_access(server, file_attachment, default_user): old_ts = file_attachment.last_accessed_at @@ -6031,7 +6047,7 @@ async def test_mark_access(server, file_attachment, default_user): file_id=file_attachment.file_id, actor=default_user, ) - refreshed = await server.file_agent_manager.get_file_agent( + refreshed = await server.file_agent_manager.get_file_agent_by_id( agent_id=file_attachment.agent_id, file_id=file_attachment.file_id, actor=default_user, @@ -6087,7 +6103,7 @@ async def test_detach_file(server, file_attachment, default_user): file_id=file_attachment.file_id, actor=default_user, ) - res = await server.file_agent_manager.get_file_agent( + res = await server.file_agent_manager.get_file_agent_by_id( agent_id=file_attachment.agent_id, file_id=file_attachment.file_id, actor=default_user, diff --git a/tests/test_sources.py b/tests/test_sources.py index df290fa8..9001742f 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -9,6 +9,8 @@ from letta_client import CreateBlock from letta_client import Letta as LettaSDKClient from letta_client.types import AgentState +from letta.schemas.message import MessageCreate +from tests.helpers.utils import retry_until_success from tests.utils import wait_for_server # Constants @@ -35,11 +37,15 @@ def client() -> LettaSDKClient: wait_for_server(server_url) print("Running client tests with server:", server_url) client = LettaSDKClient(base_url=server_url, token=None) + client.tools.upsert_base_tools() yield client @pytest.fixture def agent_state(client: LettaSDKClient): + open_file_tool = client.tools.list(name="open_file")[0] + close_file_tool = client.tools.list(name="close_file")[0] + agent_state = client.agents.create( memory_blocks=[ CreateBlock( @@ -49,12 +55,10 @@ def agent_state(client: LettaSDKClient): ], model="openai/gpt-4o-mini", embedding="openai/text-embedding-ada-002", + tool_ids=[open_file_tool.id, close_file_tool.id], ) yield agent_state - # delete agent - client.agents.delete(agent_id=agent_state.id) - @pytest.mark.parametrize( "file_path, expected_value, expected_label_regex", @@ -218,3 +222,148 @@ def test_delete_source_removes_source_blocks_correctly(client: LettaSDKClient, a assert len(blocks) == 0 assert "test" not in [b.value for b in blocks] assert not any(re.fullmatch(r"test_[a-z0-9]+\.txt", b.label) for b in blocks) + + +@retry_until_success(max_attempts=5, sleep_time_seconds=2) +def test_agent_uses_open_close_file_correctly(client: LettaSDKClient, agent_state: AgentState): + print(f"Starting test with agent ID: {agent_state.id}") + + # Clear existing sources + existing_sources = client.sources.list() + print(f"Found {len(existing_sources)} existing sources, clearing...") + for source in existing_sources: + print(f" Deleting source: {source.id}") + client.sources.delete(source_id=source.id) + + # Clear existing jobs + existing_jobs = client.jobs.list() + print(f"Found {len(existing_jobs)} existing jobs, clearing...") + for job in existing_jobs: + print(f" Deleting job: {job.id}") + client.jobs.delete(job_id=job.id) + + # Create a new source + print("Creating new source...") + source = client.sources.create(name="test_source", embedding="openai/text-embedding-ada-002") + print(f"Created source with ID: {source.id}") + + sources_list = client.sources.list() + assert len(sources_list) == 1 + print(f"✓ Verified source creation - found {len(sources_list)} source(s)") + + # Attach source to agent + print(f"Attaching source {source.id} to agent {agent_state.id}...") + client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) + print("✓ Source attached to agent") + + # Load files into the source + file_path = "tests/data/long_test.txt" + print(f"Uploading file: {file_path}") + + # Upload the files + with open(file_path, "rb") as f: + job = client.sources.files.upload(source_id=source.id, file=f) + + print(f"File upload job created with ID: {job.id}, initial status: {job.status}") + + # Wait for the jobs to complete + while job.status != "completed": + print(f"Waiting for job {job.id} to complete... Current status: {job.status}") + time.sleep(1) + job = client.jobs.retrieve(job_id=job.id) + + print(f"✓ Job completed successfully with status: {job.status}") + + # Get uploaded files + print("Retrieving uploaded files...") + files = client.sources.files.list(source_id=source.id, limit=1) + assert len(files) == 1 + assert files[0].source_id == source.id + file = files[0] + print(f"✓ Found uploaded file: {file.file_name} (ID: {file.id})") + + # Check that file is opened initially + print("Checking initial agent state...") + agent_state = client.agents.retrieve(agent_id=agent_state.id) + blocks = agent_state.memory.file_blocks + print(f"Agent has {len(blocks)} file block(s)") + if blocks: + initial_content_length = len(blocks[0].value) + print(f"Initial file content length: {initial_content_length} characters") + print(f"First 100 chars of content: {blocks[0].value[:100]}...") + assert initial_content_length > 10, f"Expected file content > 10 chars, got {initial_content_length}" + print("✓ File appears to be initially loaded") + + # Ask agent to close the file + print(f"Requesting agent to close file: {file.file_name}") + close_response = client.agents.messages.create( + agent_id=agent_state.id, + messages=[MessageCreate(role="user", content=f"Use ONLY the close_file tool to close the file named {file.file_name}")], + ) + print(f"Close file request sent, got {len(close_response.messages)} message(s) in response") + print(close_response.messages) + + # Check that file is closed + print("Verifying file is closed...") + agent_state = client.agents.retrieve(agent_id=agent_state.id) + blocks = agent_state.memory.file_blocks + closed_content_length = len(blocks[0].value) if blocks else 0 + print(f"File content length after close: {closed_content_length} characters") + assert closed_content_length == 0, f"Expected empty content after close, got {closed_content_length} chars" + print("✓ File successfully closed") + + # Ask agent to open the file for a specific range + start, end = 0, 5 + print(f"Requesting agent to open file for range [{start}, {end}]") + open_response1 = client.agents.messages.create( + agent_id=agent_state.id, + messages=[ + MessageCreate( + role="user", content=f"Use ONLY the open_file tool to open the file named {file.file_name} for view range [{start}, {end}]" + ) + ], + ) + print(f"First open request sent, got {len(open_response1.messages)} message(s) in response") + print(open_response1.messages) + + # Check that file is opened + print("Verifying file is opened with first range...") + agent_state = client.agents.retrieve(agent_id=agent_state.id) + blocks = agent_state.memory.file_blocks + old_value = blocks[0].value + old_content_length = len(old_value) + print(f"File content length after first open: {old_content_length} characters") + print(f"First range content: '{old_value}'") + assert old_content_length > 10, f"Expected content > 10 chars for range [{start}, {end}], got {old_content_length}" + print("✓ File successfully opened with first range") + + # Ask agent to open the file for a different range + start, end = 5, 10 + print(f"Requesting agent to open file for different range [{start}, {end}]") + open_response2 = client.agents.messages.create( + agent_id=agent_state.id, + messages=[ + MessageCreate( + role="user", content=f"Use ONLY the open_file tool to open the file named {file.file_name} for view range [{start}, {end}]" + ) + ], + ) + print(f"Second open request sent, got {len(open_response2.messages)} message(s) in response") + print(open_response2.messages) + + # Check that file is opened, but for different range + print("Verifying file is opened with second range...") + agent_state = client.agents.retrieve(agent_id=agent_state.id) + blocks = agent_state.memory.file_blocks + new_value = blocks[0].value + new_content_length = len(new_value) + print(f"File content length after second open: {new_content_length} characters") + print(f"Second range content: '{new_value}'") + assert new_content_length > 10, f"Expected content > 10 chars for range [{start}, {end}], got {new_content_length}" + + print(f"Comparing content ranges:") + print(f" First range [0, 5]: '{old_value}'") + print(f" Second range [5, 10]: '{new_value}'") + + assert new_value != old_value, f"Different view ranges should have different content. New: '{new_value}', Old: '{old_value}'" + print("✓ File successfully opened with different range - content differs as expected")