diff --git a/letta/agents/base_agent.py b/letta/agents/base_agent.py index 8a793088..3500fba3 100644 --- a/letta/agents/base_agent.py +++ b/letta/agents/base_agent.py @@ -1,11 +1,13 @@ from abc import ABC, abstractmethod -from typing import Any, AsyncGenerator, Optional, Union +from typing import Any, AsyncGenerator, List, Optional, Union import openai from letta.schemas.enums import MessageStreamStatus -from letta.schemas.letta_message import LegacyLettaMessage, LettaMessage, UserMessage +from letta.schemas.letta_message import LegacyLettaMessage, LettaMessage +from letta.schemas.letta_message_content import TextContent from letta.schemas.letta_response import LettaResponse +from letta.schemas.message import MessageCreate from letta.schemas.user import User from letta.services.agent_manager import AgentManager from letta.services.message_manager import MessageManager @@ -33,7 +35,7 @@ class BaseAgent(ABC): self.actor = actor @abstractmethod - async def step(self, input_message: UserMessage, max_steps: int = 10) -> LettaResponse: + async def step(self, input_messages: List[MessageCreate], max_steps: int = 10) -> LettaResponse: """ Main execution loop for the agent. """ @@ -41,15 +43,24 @@ class BaseAgent(ABC): @abstractmethod async def step_stream( - self, input_message: UserMessage, max_steps: int = 10 + self, input_messages: List[MessageCreate], max_steps: int = 10 ) -> AsyncGenerator[Union[LettaMessage, LegacyLettaMessage, MessageStreamStatus], None]: """ Main streaming execution loop for the agent. """ raise NotImplementedError - def pre_process_input_message(self, input_message: UserMessage) -> Any: + def pre_process_input_message(self, input_messages: List[MessageCreate]) -> Any: """ Pre-process function to run on the input_message. """ - return input_message.model_dump() + + def get_content(message: MessageCreate) -> str: + if isinstance(message.content, str): + return message.content + elif message.content and len(message.content) == 1 and isinstance(message.content[0], TextContent): + return message.content[0].text + else: + return "" + + return [{"role": input_message.role, "content": get_content(input_message)} for input_message in input_messages] diff --git a/letta/agents/ephemeral_agent.py b/letta/agents/ephemeral_agent.py index 91e89343..d951b434 100644 --- a/letta/agents/ephemeral_agent.py +++ b/letta/agents/ephemeral_agent.py @@ -5,9 +5,8 @@ import openai from letta.agents.base_agent import BaseAgent from letta.schemas.agent import AgentState from letta.schemas.enums import MessageRole -from letta.schemas.letta_message import UserMessage from letta.schemas.letta_message_content import TextContent -from letta.schemas.message import Message +from letta.schemas.message import Message, MessageCreate from letta.schemas.openai.chat_completion_request import ChatCompletionRequest from letta.schemas.user import User from letta.services.agent_manager import AgentManager @@ -37,15 +36,15 @@ class EphemeralAgent(BaseAgent): actor=actor, ) - async def step(self, input_message: UserMessage) -> List[Message]: + async def step(self, input_messages: List[MessageCreate]) -> List[Message]: """ Synchronous method that takes a user's input text and returns a summary from OpenAI. Returns a list of ephemeral Message objects containing both the user text and the assistant summary. """ agent_state = self.agent_manager.get_agent_by_id(agent_id=self.agent_id, actor=self.actor) - input_message = self.pre_process_input_message(input_message=input_message) - request = self._build_openai_request([input_message], agent_state) + openai_messages = self.pre_process_input_message(input_messages=input_messages) + request = self._build_openai_request(openai_messages, agent_state) chat_completion = await self.openai_client.chat.completions.create(**request.model_dump(exclude_unset=True)) @@ -66,7 +65,7 @@ class EphemeralAgent(BaseAgent): ) return openai_request - async def step_stream(self, input_message: UserMessage) -> AsyncGenerator[str, None]: + async def step_stream(self, input_messages: List[MessageCreate]) -> AsyncGenerator[str, None]: """ This agent is synchronous-only. If called in an async context, raise an error. """ diff --git a/letta/agents/ephemeral_memory_agent.py b/letta/agents/ephemeral_memory_agent.py index 03fc64e0..af765d02 100644 --- a/letta/agents/ephemeral_memory_agent.py +++ b/letta/agents/ephemeral_memory_agent.py @@ -7,9 +7,8 @@ from letta.helpers.tool_execution_helper import enable_strict_mode from letta.orm.enums import ToolType from letta.schemas.agent import AgentState from letta.schemas.enums import MessageRole -from letta.schemas.letta_message import UserMessage from letta.schemas.letta_message_content import TextContent -from letta.schemas.message import Message +from letta.schemas.message import Message, MessageCreate from letta.schemas.openai.chat_completion_request import ChatCompletionRequest, Tool from letta.schemas.user import User from letta.services.agent_manager import AgentManager @@ -38,15 +37,15 @@ class EphemeralMemoryAgent(BaseAgent): actor=actor, ) - async def step(self, input_message: UserMessage) -> List[Message]: + async def step(self, input_messages: List[MessageCreate]) -> List[Message]: """ Synchronous method that takes a user's input text and returns a summary from OpenAI. Returns a list of ephemeral Message objects containing both the user text and the assistant summary. """ agent_state = self.agent_manager.get_agent_by_id(agent_id=self.agent_id, actor=self.actor) - input_message = self.pre_process_input_message(input_message=input_message) - request = self._build_openai_request([input_message], agent_state) + openai_messages = self.pre_process_input_message(input_messages=input_messages) + request = self._build_openai_request(openai_messages, agent_state) chat_completion = await self.openai_client.chat.completions.create(**request.model_dump(exclude_unset=True)) @@ -57,7 +56,8 @@ class EphemeralMemoryAgent(BaseAgent): ) ] - def pre_process_input_message(self, input_message: UserMessage) -> Dict: + def pre_process_input_message(self, input_messages: List[MessageCreate]) -> List[Dict]: + input_message = input_messages[0] input_prompt_augmented = f""" You are a memory recall agent whose job is to comb through a large set of messages and write relevant memories in relation to a user query. Your response will directly populate a "memory block" called "human" that describes the user, that will be used to answer more questions in the future. @@ -78,9 +78,7 @@ class EphemeralMemoryAgent(BaseAgent): Your response: """ - input_message.content = input_prompt_augmented - # print(input_prompt_augmented) - return input_message.model_dump() + return [{"role": "user", "content": input_prompt_augmented}] def _format_messages_llm_friendly(self): messages = self.message_manager.list_messages_for_agent(agent_id=self.agent_id, actor=self.actor) @@ -107,7 +105,7 @@ class EphemeralMemoryAgent(BaseAgent): return [Tool(type="function", function=enable_strict_mode(t.json_schema)) for t in tools] - async def step_stream(self, input_message: UserMessage) -> AsyncGenerator[str, None]: + async def step_stream(self, input_messages: List[MessageCreate]) -> AsyncGenerator[str, None]: """ This agent is synchronous-only. If called in an async context, raise an error. """ diff --git a/letta/agents/helpers.py b/letta/agents/helpers.py index 9f614510..9460a7ce 100644 --- a/letta/agents/helpers.py +++ b/letta/agents/helpers.py @@ -1,11 +1,11 @@ -from typing import Dict, List, Tuple +from typing import List, Tuple from letta.schemas.agent import AgentState from letta.schemas.letta_response import LettaResponse -from letta.schemas.message import Message +from letta.schemas.message import Message, MessageCreate from letta.schemas.usage import LettaUsageStatistics from letta.schemas.user import User -from letta.server.rest_api.utils import create_user_message +from letta.server.rest_api.utils import create_input_messages from letta.services.message_manager import MessageManager @@ -20,13 +20,13 @@ def _create_letta_response(new_in_context_messages: list[Message], use_assistant def _prepare_in_context_messages( - input_message: Dict, agent_state: AgentState, message_manager: MessageManager, actor: User + input_messages: List[MessageCreate], agent_state: AgentState, message_manager: MessageManager, actor: User ) -> Tuple[List[Message], List[Message]]: """ Prepares in-context messages for an agent, based on the current state and a new user input. Args: - input_message (Dict): The new user input message to process. + input_messages (List[MessageCreate]): The new user input messages to process. agent_state (AgentState): The current state of the agent, including message buffer config. message_manager (MessageManager): The manager used to retrieve and create messages. actor (User): The user performing the action, used for access control and attribution. @@ -46,7 +46,7 @@ def _prepare_in_context_messages( # Create a new user message from the input and store it new_in_context_messages = message_manager.create_many_messages( - [create_user_message(input_message=input_message, agent_id=agent_state.id, actor=actor)], actor=actor + create_input_messages(input_messages=input_messages, agent_id=agent_state.id, actor=actor), actor=actor ) return current_in_context_messages, new_in_context_messages diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index 128a1073..a158c10e 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -18,12 +18,11 @@ from letta.local_llm.constants import INNER_THOUGHTS_KWARG from letta.log import get_logger from letta.orm.enums import ToolType from letta.schemas.agent import AgentState -from letta.schemas.enums import MessageStreamStatus +from letta.schemas.enums import MessageRole, MessageStreamStatus from letta.schemas.letta_message import AssistantMessage from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent from letta.schemas.letta_response import LettaResponse -from letta.schemas.message import Message, MessageUpdate -from letta.schemas.openai.chat_completion_request import UserMessage +from letta.schemas.message import Message, MessageCreate, MessageUpdate from letta.schemas.openai.chat_completion_response import ToolCall from letta.schemas.user import User from letta.server.rest_api.utils import create_letta_messages_from_llm_response @@ -60,11 +59,10 @@ class LettaAgent(BaseAgent): self.use_assistant_message = use_assistant_message @trace_method - async def step(self, input_message: UserMessage, max_steps: int = 10) -> LettaResponse: - input_message = self.pre_process_input_message(input_message) + async def step(self, input_messages: List[MessageCreate], max_steps: int = 10) -> LettaResponse: agent_state = self.agent_manager.get_agent_by_id(self.agent_id, actor=self.actor) current_in_context_messages, new_in_context_messages = _prepare_in_context_messages( - input_message, agent_state, self.message_manager, self.actor + input_messages, agent_state, self.message_manager, self.actor ) tool_rules_solver = ToolRulesSolver(agent_state.tool_rules) llm_client = LLMClient.create( @@ -96,16 +94,15 @@ class LettaAgent(BaseAgent): @trace_method async def step_stream( - self, input_message: UserMessage, max_steps: int = 10, use_assistant_message: bool = False + self, input_messages: List[MessageCreate], max_steps: int = 10, use_assistant_message: bool = False ) -> AsyncGenerator[str, None]: """ Main streaming loop that yields partial tokens. Whenever we detect a tool call, we yield from _handle_ai_response as well. """ - input_message = self.pre_process_input_message(input_message) agent_state = self.agent_manager.get_agent_by_id(self.agent_id, actor=self.actor) current_in_context_messages, new_in_context_messages = _prepare_in_context_messages( - input_message, agent_state, self.message_manager, self.actor + input_messages, agent_state, self.message_manager, self.actor ) tool_rules_solver = ToolRulesSolver(agent_state.tool_rules) llm_client = LLMClient.create( @@ -362,7 +359,9 @@ class LettaAgent(BaseAgent): f"{message}" ) - letta_response = await letta_agent.step(UserMessage(content=augmented_message)) + letta_response = await letta_agent.step( + [MessageCreate(role=MessageRole.system, content=[TextContent(text=augmented_message)])] + ) messages = letta_response.messages send_message_content = [message.content for message in messages if isinstance(message, AssistantMessage)] diff --git a/letta/agents/voice_agent.py b/letta/agents/voice_agent.py index 16f8ff97..a0601183 100644 --- a/letta/agents/voice_agent.py +++ b/letta/agents/voice_agent.py @@ -19,8 +19,9 @@ from letta.log import get_logger from letta.orm.enums import ToolType from letta.schemas.agent import AgentState from letta.schemas.block import BlockUpdate +from letta.schemas.letta_message_content import TextContent from letta.schemas.letta_response import LettaResponse -from letta.schemas.message import Message, MessageUpdate +from letta.schemas.message import Message, MessageCreate, MessageUpdate from letta.schemas.openai.chat_completion_request import ( AssistantMessage, ChatCompletionRequest, @@ -34,8 +35,8 @@ from letta.schemas.user import User from letta.server.rest_api.utils import ( convert_letta_messages_to_openai, create_assistant_messages_from_openai_response, + create_input_messages, create_letta_messages_from_llm_response, - create_user_message, ) from letta.services.agent_manager import AgentManager from letta.services.block_manager import BlockManager @@ -93,19 +94,18 @@ class VoiceAgent(BaseAgent): agent_id=agent_id, openai_client=openai_client, message_manager=message_manager, agent_manager=agent_manager, actor=actor ) - async def step(self, input_message: UserMessage, max_steps: int = 10) -> LettaResponse: + async def step(self, input_messages: List[MessageCreate], max_steps: int = 10) -> LettaResponse: raise NotImplementedError("LowLatencyAgent does not have a synchronous step implemented currently.") - async def step_stream(self, input_message: UserMessage, max_steps: int = 10) -> AsyncGenerator[str, None]: + async def step_stream(self, input_messages: List[MessageCreate], max_steps: int = 10) -> AsyncGenerator[str, None]: """ Main streaming loop that yields partial tokens. Whenever we detect a tool call, we yield from _handle_ai_response as well. """ - input_message = self.pre_process_input_message(input_message) agent_state = self.agent_manager.get_agent_by_id(self.agent_id, actor=self.actor) in_context_messages = self.message_manager.get_messages_by_ids(message_ids=agent_state.message_ids, actor=self.actor) - letta_message_db_queue = [create_user_message(input_message=input_message, agent_id=agent_state.id, actor=self.actor)] - in_memory_message_history = [input_message] + letta_message_db_queue = [create_input_messages(input_messages=input_messages, agent_id=agent_state.id, actor=self.actor)] + in_memory_message_history = self.pre_process_input_message(input_messages) # TODO: Define max steps here for _ in range(max_steps): @@ -372,7 +372,7 @@ class VoiceAgent(BaseAgent): return f"Failed to call tool. Error: {e}", False async def _recall_memory(self, query, agent_state: AgentState) -> None: - results = await self.offline_memory_agent.step(UserMessage(content=query)) + results = await self.offline_memory_agent.step([MessageCreate(role="user", content=[TextContent(text=query)])]) target_block = next(b for b in agent_state.memory.blocks if b.label == self.summary_block_label) self.block_manager.update_block( block_id=target_block.id, block_update=BlockUpdate(value=results[0].content[0].text), actor=self.actor diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index fa601281..c6744b10 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -22,7 +22,6 @@ from letta.schemas.letta_request import LettaRequest, LettaStreamingRequest from letta.schemas.letta_response import LettaResponse from letta.schemas.memory import ContextWindowOverview, CreateArchivalMemory, Memory from letta.schemas.message import MessageCreate -from letta.schemas.openai.chat_completion_request import UserMessage from letta.schemas.passage import Passage, PassageUpdate from letta.schemas.run import Run from letta.schemas.source import Source @@ -610,9 +609,7 @@ async def send_message( actor=actor, ) - messages = request.messages - content = messages[0].content[0].text if messages and not isinstance(messages[0].content, str) else messages[0].content - result = await experimental_agent.step(UserMessage(content=content), max_steps=10) + result = await experimental_agent.step(request.messages, max_steps=10) else: result = await server.send_message_to_agent( agent_id=agent_id, @@ -672,10 +669,8 @@ async def send_message_streaming( actor=actor, ) - messages = request.messages - content = messages[0].content[0].text if messages and not isinstance(messages[0].content, str) else messages[0].content result = StreamingResponse( - experimental_agent.step_stream(UserMessage(content=content), max_steps=10, use_assistant_message=request.use_assistant_message), + experimental_agent.step_stream(request.messages, max_steps=10, use_assistant_message=request.use_assistant_message), media_type="text/event-stream", ) else: diff --git a/letta/server/rest_api/utils.py b/letta/server/rest_api/utils.py index a8aea959..af944ec1 100644 --- a/letta/server/rest_api/utils.py +++ b/letta/server/rest_api/utils.py @@ -19,7 +19,7 @@ from letta.helpers.datetime_helpers import get_utc_time from letta.log import get_logger from letta.schemas.enums import MessageRole from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent -from letta.schemas.message import Message +from letta.schemas.message import Message, MessageCreate from letta.schemas.usage import LettaUsageStatistics from letta.schemas.user import User from letta.server.rest_api.interface import StreamingServerInterface @@ -140,31 +140,29 @@ def log_error_to_sentry(e): sentry_sdk.capture_exception(e) -def create_user_message(input_message: dict, agent_id: str, actor: User) -> Message: +def create_input_messages(input_messages: List[MessageCreate], agent_id: str, actor: User) -> List[Message]: """ Converts a user input message into the internal structured format. """ - # Generate timestamp in the correct format - # Skip pytz for performance reasons - now = get_utc_time().isoformat() + new_messages = [] + for input_message in input_messages: + # Construct the Message object + new_message = Message( + id=f"message-{uuid.uuid4()}", + role=input_message.role, + content=input_message.content, + name=input_message.name, + otid=input_message.otid, + organization_id=actor.organization_id, + agent_id=agent_id, + model=None, + tool_calls=None, + tool_call_id=None, + created_at=get_utc_time(), + ) + new_messages.append(new_message) - # Format message as structured JSON - structured_message = {"type": "user_message", "message": input_message["content"], "time": now} - - # Construct the Message object - user_message = Message( - id=f"message-{uuid.uuid4()}", - role=MessageRole.user, - content=[TextContent(text=json.dumps(structured_message, indent=2))], # Store structured JSON - organization_id=actor.organization_id, - agent_id=agent_id, - model=None, - tool_calls=None, - tool_call_id=None, - created_at=get_utc_time(), - ) - - return user_message + return new_messages def create_letta_messages_from_llm_response( diff --git a/letta/services/summarizer/summarizer.py b/letta/services/summarizer/summarizer.py index a726ce01..45f7b01f 100644 --- a/letta/services/summarizer/summarizer.py +++ b/letta/services/summarizer/summarizer.py @@ -4,8 +4,8 @@ from typing import List, Tuple from letta.agents.base_agent import BaseAgent from letta.schemas.enums import MessageRole -from letta.schemas.message import Message -from letta.schemas.openai.chat_completion_request import UserMessage +from letta.schemas.letta_message_content import TextContent +from letta.schemas.message import Message, MessageCreate from letta.services.summarizer.enums import SummarizationMode @@ -95,8 +95,15 @@ class Summarizer: "It should be in note-taking format in natural English. You are to return the new, updated memory only." ) - messages = await self.summarizer_agent.step(UserMessage(content=summary_request_text)) - current_summary = "\n".join([m.content[0].text for m in messages]) + response = await self.summarizer_agent.step( + input_messages=[ + MessageCreate( + role=MessageRole.user, + content=[TextContent(text=summary_request_text)], + ), + ], + ) + current_summary = "\n".join([m.content[0].text for m in response.messages if m.message_type == "assistant_message"]) current_summary = f"{self.summary_prefix}{current_summary}" return updated_in_context_messages, current_summary, True diff --git a/tests/integration_test_experimental.py b/tests/integration_test_experimental.py index 71268b21..6ade107b 100644 --- a/tests/integration_test_experimental.py +++ b/tests/integration_test_experimental.py @@ -7,14 +7,15 @@ import httpx import openai import pytest from dotenv import load_dotenv -from letta_client import CreateBlock, Letta +from letta_client import CreateBlock, Letta, MessageCreate, TextContent from openai.types.chat.chat_completion_chunk import ChatCompletionChunk from letta.agents.letta_agent import LettaAgent from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import MessageStreamStatus +from letta.schemas.letta_message_content import TextContent as LettaTextContent from letta.schemas.llm_config import LLMConfig -from letta.schemas.openai.chat_completion_request import UserMessage +from letta.schemas.message import MessageCreate as LettaMessageCreate from letta.schemas.tool import ToolCreate from letta.schemas.usage import LettaUsageStatistics from letta.services.agent_manager import AgentManager @@ -248,7 +249,7 @@ async def test_new_agent_loop(disable_e2b_api_key, openai_client, agent_state, m actor=actor, ) - response = await agent.step(UserMessage(content=message)) + response = await agent.step([LettaMessageCreate(role="user", content=[LettaTextContent(text=message)])]) @pytest.mark.asyncio @@ -265,7 +266,7 @@ async def test_rethink_tool(disable_e2b_api_key, openai_client, agent_state, mes ) assert "chicken" not in AgentManager().get_agent_by_id(agent_state.id, actor).memory.get_block("human").value - response = await agent.step(UserMessage(content=message)) + response = await agent.step([LettaMessageCreate(role="user", content=[LettaTextContent(text=message)])]) assert "chicken" in AgentManager().get_agent_by_id(agent_state.id, actor).memory.get_block("human").value @@ -275,9 +276,16 @@ async def test_multi_agent_broadcast(disable_e2b_api_key, client, openai_client, stale_agents = AgentManager().list_agents(actor=actor, limit=300) for agent in stale_agents: - client.delete_agent(agent_id=agent.id) + AgentManager().delete_agent(agent_id=agent.id, actor=actor) - manager_agent_state = client.create_agent(name=f"manager", include_base_tools=True, include_multi_agent_tools=True, tags=["manager"]) + manager_agent_state = client.agents.create( + name=f"manager", + include_base_tools=True, + include_multi_agent_tools=True, + tags=["manager"], + model="openai/gpt-4o", + embedding="letta/letta-free", + ) manager_agent = LettaAgent( agent_id=manager_agent_state.id, message_manager=MessageManager(), @@ -290,12 +298,31 @@ async def test_multi_agent_broadcast(disable_e2b_api_key, client, openai_client, tag = "subagent" workers = [] for idx in range(30): - workers.append(client.create_agent(name=f"worker_{idx}", include_base_tools=True, tags=[tag], tool_ids=[weather_tool.id])) + workers.append( + client.agents.create( + name=f"worker_{idx}", + include_base_tools=True, + tags=[tag], + tool_ids=[weather_tool.id], + model="openai/gpt-4o", + embedding="letta/letta-free", + ), + ) response = await manager_agent.step( - UserMessage( - content="Use the `send_message_to_agents_matching_tags` tool to send a message to agents with tag 'subagent' asking them to check the weather in Seattle.", - ) + [ + LettaMessageCreate( + role="user", + content=[ + LettaTextContent( + text=( + "Use the `send_message_to_agents_matching_tags` tool to send a message to agents with " + "tag 'subagent' asking them to check the weather in Seattle." + ) + ), + ], + ), + ] ) @@ -334,10 +361,14 @@ def test_multi_agent_broadcast_client(client: Letta, weather_tool): response = client.agents.messages.create( agent_id=supervisor.id, messages=[ - { - "role": "user", - "content": "Use the `send_message_to_agents_matching_tags` tool to send a message to agents with tag 'worker' asking them to check the weather in Seattle.", - } + MessageCreate( + role="user", + content=[ + TextContent( + text="Use the `send_message_to_agents_matching_tags` tool to send a message to agents with tag 'worker' asking them to check the weather in Seattle." + ) + ], + ) ], ) end = time.perf_counter() @@ -456,10 +487,10 @@ def test_anthropic_streaming(client: Letta): response = client.agents.messages.create_stream( agent_id=agent.id, messages=[ - { - "role": "user", - "content": "Use core memory append to append `banana` to the persona core memory.", - } + MessageCreate( + role="user", + content=[TextContent(text="Use the core memory append tool to append `banana` to the persona core memory.")], + ), ], stream_tokens=True, )