Files
letta-server/letta/schemas/message.py
Kian Jones 25d54dd896 chore: enable F821, F401, W293 (#9503)
* auto fixes

* auto fix pt2 and transitive deps and undefined var checking locals()

* manual fixes (ignored or letta-code fixed)

* fix circular import
2026-02-24 10:55:08 -08:00

2676 lines
120 KiB
Python

from __future__ import annotations
from letta.log import get_logger
logger = get_logger(__name__)
import copy
import json
import re
import uuid
from collections import OrderedDict
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union
from openai.types.chat.chat_completion_message_tool_call import ChatCompletionMessageToolCall as OpenAIToolCall, Function as OpenAIFunction
from pydantic import BaseModel, Field, field_validator, model_validator
from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG, REQUEST_HEARTBEAT_PARAM, TOOL_CALL_ID_MAX_LEN
from letta.helpers.datetime_helpers import get_utc_time, is_utc_datetime
from letta.helpers.json_helpers import json_dumps
from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_VERTEX
from letta.otel.tracing import trace_method
from letta.schemas.enums import MessageRole, PrimitiveType
from letta.schemas.letta_base import OrmMetadataBase
from letta.schemas.letta_message import (
ApprovalRequestMessage,
ApprovalResponseMessage,
ApprovalReturn,
AssistantMessage,
AssistantMessageListResult,
HiddenReasoningMessage,
LettaMessage,
LettaMessageReturnUnion,
LettaMessageSearchResult,
MessageType,
ReasoningMessage,
ReasoningMessageListResult,
SummaryMessage,
SystemMessage,
SystemMessageListResult,
ToolCall,
ToolCallMessage,
ToolReturn as LettaToolReturn,
ToolReturnMessage,
UserMessage,
UserMessageListResult,
extract_compaction_stats_from_packed_json,
)
from letta.schemas.letta_message_content import (
ImageContent,
ImageSourceType,
LettaMessageContentUnion,
LettaToolReturnContentUnion,
OmittedReasoningContent,
ReasoningContent,
RedactedReasoningContent,
SummarizedReasoningContent,
TextContent,
ToolCallContent,
ToolReturnContent,
get_letta_message_content_union_str_json_schema,
)
from letta.system import unpack_message
from letta.utils import parse_json, sanitize_tool_call_id, validate_function_response
def truncate_tool_return(content: Optional[str], limit: Optional[int]) -> Optional[str]:
if limit is None or content is None:
return content
if len(content) <= limit:
return content
return content[:limit] + f"... [truncated {len(content) - limit} chars]"
def _get_text_from_part(part: Union[TextContent, ImageContent, dict]) -> Optional[str]:
"""Extract text from a content part, returning None for images."""
if isinstance(part, TextContent):
return part.text
elif isinstance(part, dict) and part.get("type") == "text":
return part.get("text", "")
return None
def tool_return_to_text(func_response: Optional[Union[str, List]]) -> Optional[str]:
"""Convert tool return content to text, replacing images with placeholders."""
if func_response is None:
return None
if isinstance(func_response, str):
return func_response
text_parts = [text for part in func_response if (text := _get_text_from_part(part))]
image_count = sum(
1 for part in func_response if isinstance(part, ImageContent) or (isinstance(part, dict) and part.get("type") == "image")
)
result = "\n".join(text_parts)
if image_count > 0:
placeholder = "[Image omitted]" if image_count == 1 else f"[{image_count} images omitted]"
result = (result + " " + placeholder) if result else placeholder
return result if result else None
def add_inner_thoughts_to_tool_call(
tool_call: OpenAIToolCall,
inner_thoughts: str,
inner_thoughts_key: str,
) -> OpenAIToolCall:
"""Add inner thoughts (arg + value) to a tool call"""
try:
# load the args list
func_args = parse_json(tool_call.function.arguments)
# create new ordered dict with inner thoughts first
ordered_args = OrderedDict({inner_thoughts_key: inner_thoughts})
# update with remaining args
ordered_args.update(func_args)
# create the updated tool call (as a string)
updated_tool_call = copy.deepcopy(tool_call)
updated_tool_call.function.arguments = json_dumps(ordered_args)
return updated_tool_call
except json.JSONDecodeError as e:
logger.warning(f"Failed to put inner thoughts in kwargs: {e}")
raise e
class MessageCreateType(str, Enum):
message = "message"
approval = "approval"
tool_return = "tool_return"
class MessageCreateBase(BaseModel):
type: MessageCreateType = Field(..., description="The message type to be created.")
class MessageCreate(MessageCreateBase):
"""Request to create a message"""
type: Optional[Literal[MessageCreateType.message]] = Field(
default=MessageCreateType.message, description="The message type to be created."
)
# In the simplified format, only allow simple roles
role: Literal[
MessageRole.user,
MessageRole.system,
MessageRole.assistant,
] = Field(..., description="The role of the participant.")
content: Union[str, List[LettaMessageContentUnion]] = Field(
...,
description="The content of the message.",
json_schema_extra=get_letta_message_content_union_str_json_schema(),
)
name: Optional[str] = Field(default=None, description="The name of the participant.")
otid: Optional[str] = Field(default=None, description="The offline threading id associated with this message")
sender_id: Optional[str] = Field(default=None, description="The id of the sender of the message, can be an identity id or agent id")
batch_item_id: Optional[str] = Field(default=None, description="The id of the LLMBatchItem that this message is associated with")
group_id: Optional[str] = Field(default=None, description="The multi-agent group that the message was sent in")
def model_dump(self, to_orm: bool = False, **kwargs) -> Dict[str, Any]:
data = super().model_dump(**kwargs)
if to_orm and "content" in data:
if isinstance(data["content"], str):
data["content"] = [TextContent(text=data["content"])]
return data
class ApprovalCreate(MessageCreateBase):
"""Input to approve or deny a tool call request"""
type: Literal[MessageCreateType.approval] = Field(default=MessageCreateType.approval, description="The message type to be created.")
approvals: Optional[List[LettaMessageReturnUnion]] = Field(default=None, description="The list of approval responses")
approve: Optional[bool] = Field(None, description="Whether the tool has been approved", deprecated=True)
approval_request_id: Optional[str] = Field(None, description="The message ID of the approval request", deprecated=True)
reason: Optional[str] = Field(None, description="An optional explanation for the provided approval status", deprecated=True)
group_id: Optional[str] = Field(default=None, description="The multi-agent group that the message was sent in")
@model_validator(mode="after")
def migrate_deprecated_fields(self):
if not self.approvals and self.approve is not None and self.approval_request_id is not None:
self.approvals = [
ApprovalReturn(
tool_call_id=self.approval_request_id,
approve=self.approve,
reason=self.reason,
)
]
return self
class ToolReturnCreate(MessageCreateBase):
"""Submit tool return(s) from client-side tool execution.
This is the preferred way to send tool results back to the agent after
client-side tool execution. It is equivalent to sending an ApprovalCreate
with tool return approvals, but provides a cleaner API for the common case.
"""
type: Literal[MessageCreateType.tool_return] = Field(
default=MessageCreateType.tool_return, description="The message type to be created."
)
tool_returns: List[LettaToolReturn] = Field(
...,
description="List of tool returns from client-side execution",
)
MessageCreateUnion = Union[MessageCreate, ApprovalCreate, ToolReturnCreate]
class MessageUpdate(BaseModel):
"""Request to update a message"""
role: Optional[MessageRole] = Field(default=None, description="The role of the participant.")
content: Optional[Union[str, List[LettaMessageContentUnion]]] = Field(
default=None,
description="The content of the message.",
json_schema_extra=get_letta_message_content_union_str_json_schema(),
)
# NOTE: probably doesn't make sense to allow remapping user_id or agent_id (vs creating a new message)
# user_id: Optional[str] = Field(None, description="The unique identifier of the user.")
# agent_id: Optional[str] = Field(None, description="The unique identifier of the agent.")
# NOTE: we probably shouldn't allow updating the model field, otherwise this loses meaning
# model: Optional[str] = Field(None, description="The model used to make the function call.")
name: Optional[str] = Field(default=None, description="The name of the participant.")
# NOTE: we probably shouldn't allow updating the created_at field, right?
# created_at: Optional[datetime] = Field(None, description="The time the message was created.")
tool_calls: Optional[List[OpenAIToolCall,]] = Field(default=None, description="The list of tool calls requested.")
tool_call_id: Optional[str] = Field(default=None, description="The id of the tool call.")
def model_dump(self, to_orm: bool = False, **kwargs) -> Dict[str, Any]:
data = super().model_dump(**kwargs)
if to_orm and "content" in data:
if isinstance(data["content"], str):
data["content"] = [TextContent(text=data["content"])]
return data
class BaseMessage(OrmMetadataBase):
__id_prefix__ = PrimitiveType.MESSAGE.value
class Message(BaseMessage):
"""
Letta's internal representation of a message. Includes methods to convert to/from LLM provider formats.
Attributes:
id (str): The unique identifier of the message.
role (MessageRole): The role of the participant.
text (str): The text of the message.
user_id (str): The unique identifier of the user.
agent_id (str): The unique identifier of the agent.
model (str): The model used to make the function call.
name (str): The name of the participant.
created_at (datetime): The time the message was created.
tool_calls (List[OpenAIToolCall,]): The list of tool calls requested.
tool_call_id (str): The id of the tool call.
step_id (str): The id of the step that this message was created in.
otid (str): The offline threading id associated with this message.
tool_returns (List[ToolReturn]): The list of tool returns requested.
group_id (str): The multi-agent group that the message was sent in.
sender_id (str): The id of the sender of the message, can be an identity id or agent id.
conversation_id (str): The conversation this message belongs to.
t
"""
id: str = BaseMessage.generate_id_field()
agent_id: Optional[str] = Field(default=None, description="The unique identifier of the agent.")
model: Optional[str] = Field(default=None, description="The model used to make the function call.")
# Basic OpenAI-style fields
role: MessageRole = Field(..., description="The role of the participant.")
content: Optional[List[LettaMessageContentUnion]] = Field(default=None, description="The content of the message.")
# NOTE: in OpenAI, this field is only used for roles 'user', 'assistant', and 'function' (now deprecated). 'tool' does not use it.
name: Optional[str] = Field(
default=None,
description="For role user/assistant: the (optional) name of the participant. For role tool/function: the name of the function called.",
)
tool_calls: Optional[List[OpenAIToolCall]] = Field(
default=None, description="The list of tool calls requested. Only applicable for role assistant."
)
tool_call_id: Optional[str] = Field(default=None, description="The ID of the tool call. Only applicable for role tool.")
# Extras
step_id: Optional[str] = Field(default=None, description="The id of the step that this message was created in.")
run_id: Optional[str] = Field(default=None, description="The id of the run that this message was created in.")
otid: Optional[str] = Field(default=None, description="The offline threading id associated with this message")
tool_returns: Optional[List[ToolReturn]] = Field(default=None, description="Tool execution return information for prior tool calls")
group_id: Optional[str] = Field(default=None, description="The multi-agent group that the message was sent in")
sender_id: Optional[str] = Field(default=None, description="The id of the sender of the message, can be an identity id or agent id")
batch_item_id: Optional[str] = Field(default=None, description="The id of the LLMBatchItem that this message is associated with")
conversation_id: Optional[str] = Field(default=None, description="The conversation this message belongs to")
is_err: Optional[bool] = Field(
default=None, description="Whether this message is part of an error step. Used only for debugging purposes."
)
approval_request_id: Optional[str] = Field(
default=None, description="The id of the approval request if this message is associated with a tool call request."
)
approve: Optional[bool] = Field(default=None, description="Whether tool call is approved.")
denial_reason: Optional[str] = Field(default=None, description="The reason the tool call request was denied.")
approvals: Optional[List[ApprovalReturn | ToolReturn]] = Field(default=None, description="The list of approvals for this message.")
# This overrides the optional base orm schema, created_at MUST exist on all messages objects
created_at: datetime = Field(default_factory=get_utc_time, description="The timestamp when the object was created.")
# validate that run_id is set
# @model_validator(mode="after")
# def validate_run_id(self):
# if self.run_id is None:
# raise ValueError("Run ID is required")
# return self
@field_validator("role")
@classmethod
def validate_role(cls, v: str) -> str:
roles = ["system", "assistant", "user", "tool", "approval", "summary"]
assert v in roles, f"Role must be one of {roles}"
return v
def to_json(self):
json_message = vars(self)
if json_message["tool_calls"] is not None:
json_message["tool_calls"] = [vars(tc) for tc in json_message["tool_calls"]]
# turn datetime to ISO format
# also if the created_at is missing a timezone, add UTC
if not is_utc_datetime(self.created_at):
self.created_at = self.created_at.replace(tzinfo=timezone.utc)
json_message["created_at"] = self.created_at.isoformat()
json_message.pop("is_err", None) # make sure we don't include this debugging information
return json_message
@staticmethod
def generate_otid():
return str(uuid.uuid4())
@staticmethod
@trace_method
def to_letta_messages_from_list(
messages: List[Message],
use_assistant_message: bool = True,
assistant_message_tool_name: str = DEFAULT_MESSAGE_TOOL,
assistant_message_tool_kwarg: str = DEFAULT_MESSAGE_TOOL_KWARG,
reverse: bool = True,
include_err: Optional[bool] = None,
text_is_assistant_message: bool = False,
convert_summary_to_user: bool = True,
) -> List[LettaMessage]:
if use_assistant_message:
message_ids_to_remove = []
assistant_messages_by_tool_call = {
tool_call.id: msg
for msg in messages
if msg.role == MessageRole.assistant and msg.tool_calls
for tool_call in msg.tool_calls
}
for message in messages:
if (
message.role == MessageRole.tool
and message.tool_call_id in assistant_messages_by_tool_call
and assistant_messages_by_tool_call[message.tool_call_id].tool_calls
and assistant_message_tool_name
in [tool_call.function.name for tool_call in assistant_messages_by_tool_call[message.tool_call_id].tool_calls]
):
message_ids_to_remove.append(message.id)
messages = [msg for msg in messages if msg.id not in message_ids_to_remove]
# Convert messages to LettaMessages
return [
msg
for m in messages
for msg in m.to_letta_messages(
use_assistant_message=use_assistant_message,
assistant_message_tool_name=assistant_message_tool_name,
assistant_message_tool_kwarg=assistant_message_tool_kwarg,
reverse=reverse,
include_err=include_err,
text_is_assistant_message=text_is_assistant_message,
convert_summary_to_user=convert_summary_to_user,
)
]
@staticmethod
@trace_method
def to_letta_search_results_from_list(
search_results: List["MessageSearchResult"],
use_assistant_message: bool = True,
assistant_message_tool_name: str = DEFAULT_MESSAGE_TOOL,
assistant_message_tool_kwarg: str = DEFAULT_MESSAGE_TOOL_KWARG,
reverse: bool = True,
include_err: Optional[bool] = None,
text_is_assistant_message: bool = False,
convert_summary_to_user: bool = True,
) -> List[LettaMessageSearchResult]:
"""Convert MessageSearchResult objects into LettaMessageSearchResult objects.
This mirrors the behavior of to_letta_messages_from_list, but preserves the
originating Message.agent_id on each search result variant.
"""
letta_search_results: List[LettaMessageSearchResult] = []
for result in search_results:
message = result.message
# Convert the underlying Message into LettaMessage variants
letta_messages = message.to_letta_messages(
use_assistant_message=use_assistant_message,
assistant_message_tool_name=assistant_message_tool_name,
assistant_message_tool_kwarg=assistant_message_tool_kwarg,
reverse=reverse,
include_err=include_err,
text_is_assistant_message=text_is_assistant_message,
convert_summary_to_user=convert_summary_to_user,
)
for lm in letta_messages:
if isinstance(lm, SystemMessage):
letta_search_results.append(
SystemMessageListResult(
message_id=message.id,
message_type=lm.message_type,
content=lm.content,
agent_id=message.agent_id,
conversation_id=message.conversation_id,
created_at=message.created_at,
)
)
elif isinstance(lm, UserMessage):
letta_search_results.append(
UserMessageListResult(
message_id=message.id,
message_type=lm.message_type,
content=lm.content,
agent_id=message.agent_id,
conversation_id=message.conversation_id,
created_at=message.created_at,
)
)
elif isinstance(lm, ReasoningMessage):
letta_search_results.append(
ReasoningMessageListResult(
message_id=message.id,
message_type=lm.message_type,
reasoning=lm.reasoning,
agent_id=message.agent_id,
conversation_id=message.conversation_id,
created_at=message.created_at,
)
)
elif isinstance(lm, AssistantMessage):
letta_search_results.append(
AssistantMessageListResult(
message_id=message.id,
message_type=lm.message_type,
content=lm.content,
agent_id=message.agent_id,
conversation_id=message.conversation_id,
created_at=message.created_at,
)
)
# Other LettaMessage variants (tool, approval, etc.) are not part of
# LettaMessageSearchResult and are intentionally skipped here.
return letta_search_results
def to_letta_messages(
self,
use_assistant_message: bool = False,
assistant_message_tool_name: str = DEFAULT_MESSAGE_TOOL,
assistant_message_tool_kwarg: str = DEFAULT_MESSAGE_TOOL_KWARG,
reverse: bool = True,
include_err: Optional[bool] = None,
text_is_assistant_message: bool = False,
convert_summary_to_user: bool = True,
) -> List[LettaMessage]:
"""Convert message object (in DB format) to the style used by the original Letta API
Args:
convert_summary_to_user: If True (default), summary messages are returned as UserMessage
for backward compatibility. If False, return as SummaryMessage.
"""
messages = []
if self.role == MessageRole.assistant:
if self.content:
messages.extend(self._convert_reasoning_messages(text_is_assistant_message=text_is_assistant_message))
if self.tool_calls is not None:
messages.extend(
self._convert_tool_call_messages(
current_message_count=len(messages),
use_assistant_message=use_assistant_message,
assistant_message_tool_name=assistant_message_tool_name,
assistant_message_tool_kwarg=assistant_message_tool_kwarg,
),
)
elif self.role == MessageRole.tool:
messages.append(self._convert_tool_return_message())
elif self.role == MessageRole.user:
messages.append(self._convert_user_message())
elif self.role == MessageRole.system:
messages.append(self._convert_system_message())
elif self.role == MessageRole.summary:
messages.append(self._convert_summary_message(as_user_message=convert_summary_to_user))
elif self.role == MessageRole.approval:
if self.content:
messages.extend(self._convert_reasoning_messages(text_is_assistant_message=text_is_assistant_message))
if self.tool_calls is not None:
messages.append(self._convert_approval_request_message())
else:
if self.approvals:
first_approval = [a for a in self.approvals if isinstance(a, ApprovalReturn)]
def maybe_convert_tool_return_message(maybe_tool_return):
if isinstance(maybe_tool_return, ToolReturn):
parsed_data = self._parse_tool_response(maybe_tool_return.func_response)
return LettaToolReturn(
tool_call_id=maybe_tool_return.tool_call_id,
status=maybe_tool_return.status,
tool_return=parsed_data["message"],
stdout=maybe_tool_return.stdout,
stderr=maybe_tool_return.stderr,
)
return maybe_tool_return
approval_response_message = ApprovalResponseMessage(
id=self.id,
date=self.created_at,
otid=self.otid,
approvals=[maybe_convert_tool_return_message(approval) for approval in self.approvals],
run_id=self.run_id,
# TODO: temporary populate these fields for backwards compatibility
approve=first_approval[0].approve if first_approval else None,
approval_request_id=first_approval[0].tool_call_id if first_approval else None,
reason=first_approval[0].reason if first_approval else None,
)
else:
approval_response_message = ApprovalResponseMessage(
id=self.id,
date=self.created_at,
otid=self.otid,
approve=self.approve,
approval_request_id=self.approval_request_id,
reason=self.denial_reason,
approvals=[
# TODO: temporary workaround to populate from legacy fields
ApprovalReturn(
tool_call_id=self.approval_request_id,
approve=self.approve,
reason=self.denial_reason,
)
],
run_id=self.run_id,
)
messages.append(approval_response_message)
else:
raise ValueError(f"Unknown role: {self.role}")
return messages[::-1] if reverse else messages
def _convert_reasoning_messages(
self,
current_message_count: int = 0,
text_is_assistant_message: bool = False, # For v3 loop, set to True
) -> List[LettaMessage]:
messages = []
for content_part in self.content:
otid = Message.generate_otid_from_id(self.id, current_message_count + len(messages))
if isinstance(content_part, TextContent):
if text_is_assistant_message:
# .content is assistant message
if messages and messages[-1].message_type == MessageType.assistant_message:
messages[-1].content += content_part.text
else:
messages.append(
AssistantMessage(
id=self.id,
date=self.created_at,
content=content_part.text,
name=self.name,
otid=otid,
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
)
else:
# .content is COT
messages.append(
ReasoningMessage(
id=self.id,
date=self.created_at,
reasoning=content_part.text,
name=self.name,
otid=otid,
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
)
elif isinstance(content_part, ReasoningContent):
# "native" COT
if messages and messages[-1].message_type == MessageType.reasoning_message:
messages[-1].reasoning += content_part.reasoning
else:
messages.append(
ReasoningMessage(
id=self.id,
date=self.created_at,
reasoning=content_part.reasoning,
source="reasoner_model", # TODO do we want to tag like this?
signature=content_part.signature,
name=self.name,
otid=otid,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
)
elif isinstance(content_part, SummarizedReasoningContent):
# TODO remove the cast and just return the native type
casted_content_part = content_part.to_reasoning_content()
if casted_content_part is not None:
messages.append(
ReasoningMessage(
id=self.id,
date=self.created_at,
reasoning=casted_content_part.reasoning,
source="reasoner_model", # TODO do we want to tag like this?
signature=casted_content_part.signature,
name=self.name,
otid=otid,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
)
elif isinstance(content_part, RedactedReasoningContent):
# "native" redacted/hidden COT
messages.append(
HiddenReasoningMessage(
id=self.id,
date=self.created_at,
state="redacted",
hidden_reasoning=content_part.data,
name=self.name,
otid=otid,
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
)
elif isinstance(content_part, OmittedReasoningContent):
# Special case for "hidden reasoning" models like o1/o3
# NOTE: we also have to think about how to return this during streaming
messages.append(
HiddenReasoningMessage(
id=self.id,
date=self.created_at,
state="omitted",
name=self.name,
otid=otid,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
)
elif isinstance(content_part, ToolCallContent):
# for Gemini, we need to pass in tool calls as part of the content
continue
else:
logger.warning(f"Unrecognized content part in assistant message: {content_part}")
return messages
def _convert_assistant_message(
self,
) -> AssistantMessage:
if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent):
text_content = self.content[0].text
else:
raise ValueError(f"Invalid assistant message (no text object on message): {self.content}")
return AssistantMessage(
id=self.id,
date=self.created_at,
content=text_content,
name=self.name,
otid=self.otid,
sender_id=self.sender_id,
step_id=self.step_id,
# is_err=self.is_err,
run_id=self.run_id,
)
def _convert_tool_call_messages(
self,
current_message_count: int = 0,
use_assistant_message: bool = False,
assistant_message_tool_name: str = DEFAULT_MESSAGE_TOOL,
assistant_message_tool_kwarg: str = DEFAULT_MESSAGE_TOOL_KWARG,
) -> List[LettaMessage]:
messages = []
# If assistant mode is off, just create one ToolCallMessage with all tool calls
if not use_assistant_message:
all_tool_call_objs = [
ToolCall(
name=tool_call.function.name,
arguments=tool_call.function.arguments,
tool_call_id=tool_call.id,
)
for tool_call in self.tool_calls
]
if all_tool_call_objs:
otid = Message.generate_otid_from_id(self.id, current_message_count)
messages.append(
ToolCallMessage(
id=self.id,
date=self.created_at,
# use first tool call for the deprecated field
tool_call=all_tool_call_objs[0],
tool_calls=all_tool_call_objs,
name=self.name,
otid=otid,
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
)
return messages
collected_tool_calls = []
for tool_call in self.tool_calls:
otid = Message.generate_otid_from_id(self.id, current_message_count + len(messages))
if tool_call.function.name == assistant_message_tool_name:
if collected_tool_calls:
tool_call_message = ToolCallMessage(
id=self.id,
date=self.created_at,
# use first tool call for the deprecated field
tool_call=collected_tool_calls[0],
tool_calls=collected_tool_calls.copy(),
name=self.name,
otid=Message.generate_otid_from_id(self.id, current_message_count + len(messages)),
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
messages.append(tool_call_message)
collected_tool_calls = [] # reset the collection
try:
func_args = parse_json(tool_call.function.arguments)
message_string = validate_function_response(func_args[assistant_message_tool_kwarg], 0, truncate=False)
except KeyError:
logger.error(
"Function call %s missing %s argument; skipping assistant message conversion",
tool_call.function.name,
assistant_message_tool_kwarg,
)
continue
# Ensure content is a string (validate_function_response can return dict)
if isinstance(message_string, dict):
message_string = json_dumps(message_string)
messages.append(
AssistantMessage(
id=self.id,
date=self.created_at,
content=message_string,
name=self.name,
otid=otid,
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
)
else:
# non-assistant tool call, collect it
tool_call_obj = ToolCall(
name=tool_call.function.name,
arguments=tool_call.function.arguments,
tool_call_id=tool_call.id,
)
collected_tool_calls.append(tool_call_obj)
# flush any remaining collected tool calls
if collected_tool_calls:
tool_call_message = ToolCallMessage(
id=self.id,
date=self.created_at,
# use first tool call for the deprecated field
tool_call=collected_tool_calls[0],
tool_calls=collected_tool_calls,
name=self.name,
otid=Message.generate_otid_from_id(self.id, current_message_count + len(messages)),
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
messages.append(tool_call_message)
return messages
def _convert_tool_return_message(self) -> ToolReturnMessage:
"""Convert tool role message to ToolReturnMessage.
The tool return is packaged as follows:
packaged_message = {
"status": "OK" if was_success else "Failed",
"message": response_string,
"time": formatted_time,
}
Returns:
ToolReturnMessage: Converted tool return message
Raises:
ValueError: If message role is not 'tool', parsing fails, or no valid content exists
"""
if self.role != MessageRole.tool:
raise ValueError(f"Cannot convert message of type {self.role} to ToolReturnMessage")
# This is a very special buggy case during the double writing period
# where there is no tool call id on the tool return object, but it exists top level
# This is meant to be a short term patch - this can happen when people are using old agent files that were exported
# during a specific migration state
if len(self.tool_returns) == 1 and self.tool_call_id and not self.tool_returns[0].tool_call_id:
self.tool_returns[0].tool_call_id = self.tool_call_id
if self.tool_returns:
return self._convert_explicit_tool_returns()
return self._convert_legacy_tool_return()
def _convert_explicit_tool_returns(self) -> ToolReturnMessage:
"""Convert explicit tool returns to a single ToolReturnMessage."""
# build list of all tool return objects
all_tool_returns = []
for tool_return in self.tool_returns:
parsed_data = self._parse_tool_response(tool_return.func_response)
# Preserve multi-modal content (ToolReturn supports Union[str, List])
if isinstance(tool_return.func_response, list):
tool_return_value = tool_return.func_response
else:
tool_return_value = parsed_data["message"]
tool_return_obj = LettaToolReturn(
tool_return=tool_return_value,
status=parsed_data["status"],
tool_call_id=tool_return.tool_call_id,
stdout=tool_return.stdout,
stderr=tool_return.stderr,
)
all_tool_returns.append(tool_return_obj)
if not all_tool_returns:
# this should not happen if tool_returns is non-empty, but handle gracefully
raise ValueError("No tool returns to convert")
first_tool_return = all_tool_returns[0]
# Convert deprecated string-only field to text (preserve images in tool_returns list)
deprecated_tool_return_text = (
tool_return_to_text(first_tool_return.tool_return)
if isinstance(first_tool_return.tool_return, list)
else first_tool_return.tool_return
)
return ToolReturnMessage(
id=self.id,
date=self.created_at,
# deprecated top-level fields populated from first tool return
tool_return=deprecated_tool_return_text,
status=first_tool_return.status,
tool_call_id=first_tool_return.tool_call_id,
stdout=first_tool_return.stdout,
stderr=first_tool_return.stderr,
tool_returns=all_tool_returns,
name=self.name,
otid=Message.generate_otid_from_id(self.id, 0),
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
def _convert_legacy_tool_return(self) -> ToolReturnMessage:
"""Convert legacy single text content to ToolReturnMessage."""
if not self._has_single_text_content():
raise ValueError(f"No valid tool returns to convert: {self}")
text_content = self.content[0].text
parsed_data = self._parse_tool_response(text_content)
return self._create_tool_return_message(
message_text=parsed_data["message"],
status=parsed_data["status"],
tool_call_id=self.tool_call_id,
stdout=None,
stderr=None,
otid_index=0,
)
def _has_single_text_content(self) -> bool:
"""Check if message has exactly one text content item."""
return self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent)
def _parse_tool_response(self, response_text: Union[str, List]) -> dict:
"""Parse tool response JSON and extract message and status.
Args:
response_text: Raw JSON response text OR list of content parts (for multi-modal)
Returns:
Dictionary with 'message' and 'status' keys
Raises:
ValueError: If JSON parsing fails
"""
# Handle multi-modal content (list with text/images)
if isinstance(response_text, list):
text_representation = tool_return_to_text(response_text) or "[Multi-modal content]"
return {
"message": text_representation,
"status": "success",
}
try:
function_return = parse_json(response_text)
return {
"message": str(function_return.get("message", response_text)),
"status": self._parse_tool_status(function_return.get("status", "OK")),
}
except json.JSONDecodeError as e:
raise ValueError(f"Failed to decode function return: {response_text}") from e
def _create_tool_return_message(
self,
message_text: str,
status: str,
tool_call_id: Optional[str],
stdout: Optional[str],
stderr: Optional[str],
otid_index: int,
) -> ToolReturnMessage:
"""Create a ToolReturnMessage with common attributes.
Args:
message_text: The tool return message text
status: Tool execution status
tool_call_id: Optional tool call identifier
stdout: Optional standard output
stderr: Optional standard error
otid_index: Index for OTID generation
Returns:
Configured ToolReturnMessage instance
"""
tool_return_obj = LettaToolReturn(
tool_return=message_text,
status=status,
tool_call_id=tool_call_id,
stdout=stdout,
stderr=stderr,
)
return ToolReturnMessage(
id=self.id,
date=self.created_at,
tool_return=message_text,
status=status,
tool_call_id=tool_call_id,
stdout=stdout,
stderr=stderr,
tool_returns=[tool_return_obj],
name=self.name,
otid=Message.generate_otid_from_id(self.id, otid_index),
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
@staticmethod
def _parse_tool_status(status: str) -> Literal["success", "error"]:
"""Convert tool status string to enum value"""
if status == "OK":
return "success"
elif status == "Failed":
return "error"
else:
raise ValueError(f"Invalid status: {status}")
def _convert_approval_request_message(self) -> ApprovalRequestMessage:
"""Convert approval request message to ApprovalRequestMessage"""
def _convert_tool_call(tool_call):
return ToolCall(
name=tool_call.function.name,
arguments=tool_call.function.arguments,
tool_call_id=tool_call.id,
)
return ApprovalRequestMessage(
id=self.id,
date=self.created_at,
otid=self.otid,
sender_id=self.sender_id,
step_id=self.step_id,
run_id=self.run_id,
tool_call=_convert_tool_call(self.tool_calls[0]), # backwards compatibility
tool_calls=[_convert_tool_call(tc) for tc in self.tool_calls],
name=self.name,
)
def _convert_user_message(self) -> UserMessage:
"""Convert user role message to UserMessage"""
# Extract text content
if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent):
text_content = self.content[0].text
elif self.content:
text_content = self.content
else:
raise ValueError(f"Invalid user message (no text object on message): {self.content}")
message = unpack_message(text_content)
return UserMessage(
id=self.id,
date=self.created_at,
content=message,
name=self.name,
otid=self.otid,
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
def _convert_system_message(self) -> SystemMessage:
"""Convert system role message to SystemMessage"""
if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent):
text_content = self.content[0].text
else:
raise ValueError(f"Invalid system message (no text object on system): {self.content}")
return SystemMessage(
id=self.id,
date=self.created_at,
content=text_content,
name=self.name,
otid=self.otid,
sender_id=self.sender_id,
step_id=self.step_id,
run_id=self.run_id,
)
def _convert_summary_message(self, as_user_message: bool = True) -> Union[SummaryMessage, UserMessage]:
"""Convert summary role message to SummaryMessage or UserMessage.
Args:
as_user_message: If True, return UserMessage for backward compatibility with
clients that don't support SummaryMessage. If False, return SummaryMessage.
"""
if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent):
text_content = self.content[0].text
else:
raise ValueError(f"Invalid summary message (no text object on message): {self.content}")
# Unpack the summary from the packed JSON format
# The packed format is: {"type": "system_alert", "message": "...", "time": "...", "compaction_stats": {...}}
summary = unpack_message(text_content)
# Extract compaction_stats from the packed JSON using shared helper
compaction_stats = extract_compaction_stats_from_packed_json(text_content)
if as_user_message:
# Return as UserMessage for backward compatibility
return UserMessage(
id=self.id,
date=self.created_at,
content=summary,
name=self.name,
otid=self.otid,
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
else:
return SummaryMessage(
id=self.id,
date=self.created_at,
summary=summary,
otid=self.otid,
step_id=self.step_id,
run_id=self.run_id,
compaction_stats=compaction_stats,
)
@staticmethod
def dict_to_message(
agent_id: str,
openai_message_dict: dict,
model: Optional[str] = None, # model used to make function call
allow_functions_style: bool = False, # allow deprecated functions style?
created_at: Optional[datetime] = None,
id: Optional[str] = None,
name: Optional[str] = None,
group_id: Optional[str] = None,
tool_returns: Optional[List[ToolReturn]] = None,
run_id: Optional[str] = None,
) -> Message:
"""Convert a ChatCompletion message object into a Message object (synced to DB)"""
if not created_at:
# timestamp for creation
created_at = get_utc_time()
assert "role" in openai_message_dict, openai_message_dict
assert "content" in openai_message_dict, openai_message_dict
# TODO(caren) implicit support for only non-parts/list content types
if openai_message_dict["content"] is not None and type(openai_message_dict["content"]) is not str:
raise ValueError(f"Invalid content type: {type(openai_message_dict['content'])}")
content: List[LettaMessageContentUnion] = (
[TextContent(text=openai_message_dict["content"])] if openai_message_dict["content"] else []
)
# This is really hacky and this interface is poorly designed, we should auto derive tool_returns instead of passing it in
if not tool_returns:
tool_returns = []
if "tool_returns" in openai_message_dict:
tool_returns = [ToolReturn(**tr) for tr in openai_message_dict["tool_returns"]]
# TODO(caren) bad assumption here that "reasoning_content" always comes before "redacted_reasoning_content"
if "reasoning_content" in openai_message_dict and openai_message_dict["reasoning_content"]:
content.append(
ReasoningContent(
reasoning=openai_message_dict["reasoning_content"],
is_native=True,
signature=(
str(openai_message_dict["reasoning_content_signature"])
if "reasoning_content_signature" in openai_message_dict
else None
),
),
)
if "redacted_reasoning_content" in openai_message_dict and openai_message_dict["redacted_reasoning_content"]:
content.append(
RedactedReasoningContent(
data=str(openai_message_dict["redacted_reasoning_content"]),
),
)
if "omitted_reasoning_content" in openai_message_dict and openai_message_dict["omitted_reasoning_content"]:
content.append(
OmittedReasoningContent(),
)
# If we're going from deprecated function form
if openai_message_dict["role"] == "function":
if not allow_functions_style:
raise DeprecationWarning(openai_message_dict)
assert "tool_call_id" in openai_message_dict, openai_message_dict
# Convert from 'function' response to a 'tool' response
if id is not None:
return Message(
agent_id=agent_id,
model=model,
# standard fields expected in an OpenAI ChatCompletion message object
role=MessageRole.tool, # NOTE
content=content,
name=name,
tool_calls=openai_message_dict["tool_calls"] if "tool_calls" in openai_message_dict else None,
tool_call_id=openai_message_dict["tool_call_id"] if "tool_call_id" in openai_message_dict else None,
created_at=created_at,
id=str(id),
tool_returns=tool_returns,
group_id=group_id,
run_id=run_id,
)
else:
return Message(
agent_id=agent_id,
model=model,
# standard fields expected in an OpenAI ChatCompletion message object
role=MessageRole.tool, # NOTE
content=content,
name=name,
tool_calls=openai_message_dict["tool_calls"] if "tool_calls" in openai_message_dict else None,
tool_call_id=openai_message_dict["tool_call_id"] if "tool_call_id" in openai_message_dict else None,
created_at=created_at,
tool_returns=tool_returns,
group_id=group_id,
run_id=run_id,
)
elif "function_call" in openai_message_dict and openai_message_dict["function_call"] is not None:
if not allow_functions_style:
raise DeprecationWarning(openai_message_dict)
assert openai_message_dict["role"] == "assistant", openai_message_dict
assert "tool_call_id" in openai_message_dict, openai_message_dict
# Convert a function_call (from an assistant message) into a tool_call
# NOTE: this does not conventionally include a tool_call_id (ToolCall.id), it's on the caster to provide it
tool_calls = [
OpenAIToolCall(
id=openai_message_dict["tool_call_id"], # NOTE: unconventional source, not to spec
type="function",
function=OpenAIFunction(
name=openai_message_dict["function_call"]["name"],
arguments=openai_message_dict["function_call"]["arguments"],
),
)
]
if id is not None:
return Message(
agent_id=agent_id,
model=model,
# standard fields expected in an OpenAI ChatCompletion message object
role=MessageRole(openai_message_dict["role"]),
content=content,
name=name,
tool_calls=tool_calls,
tool_call_id=None, # NOTE: None, since this field is only non-null for role=='tool'
created_at=created_at,
id=str(id),
tool_returns=tool_returns,
group_id=group_id,
run_id=run_id,
)
else:
return Message(
agent_id=agent_id,
model=model,
# standard fields expected in an OpenAI ChatCompletion message object
role=MessageRole(openai_message_dict["role"]),
content=content,
name=openai_message_dict["name"] if "name" in openai_message_dict else None,
tool_calls=tool_calls,
tool_call_id=None, # NOTE: None, since this field is only non-null for role=='tool'
created_at=created_at,
tool_returns=tool_returns,
group_id=group_id,
run_id=run_id,
)
else:
# Basic sanity check
if openai_message_dict["role"] == "tool":
assert "tool_call_id" in openai_message_dict and openai_message_dict["tool_call_id"] is not None, openai_message_dict
else:
if "tool_call_id" in openai_message_dict:
assert openai_message_dict["tool_call_id"] is None, openai_message_dict
if "tool_calls" in openai_message_dict and openai_message_dict["tool_calls"] is not None:
assert openai_message_dict["role"] == "assistant", openai_message_dict
tool_calls = [
OpenAIToolCall(id=tool_call["id"], type=tool_call["type"], function=tool_call["function"])
for tool_call in openai_message_dict["tool_calls"]
]
else:
tool_calls = None
# If we're going from tool-call style
if id is not None:
return Message(
agent_id=agent_id,
model=model,
# standard fields expected in an OpenAI ChatCompletion message object
role=MessageRole(openai_message_dict["role"]),
content=content,
name=openai_message_dict["name"] if "name" in openai_message_dict else name,
tool_calls=tool_calls,
tool_call_id=openai_message_dict["tool_call_id"] if "tool_call_id" in openai_message_dict else None,
created_at=created_at,
id=str(id),
tool_returns=tool_returns,
group_id=group_id,
run_id=run_id,
)
else:
return Message(
agent_id=agent_id,
model=model,
# standard fields expected in an OpenAI ChatCompletion message object
role=MessageRole(openai_message_dict["role"]),
content=content,
name=openai_message_dict["name"] if "name" in openai_message_dict else name,
tool_calls=tool_calls,
tool_call_id=openai_message_dict["tool_call_id"] if "tool_call_id" in openai_message_dict else None,
created_at=created_at,
tool_returns=tool_returns,
group_id=group_id,
run_id=run_id,
)
def to_openai_dict_search_results(self, max_tool_id_length: int = TOOL_CALL_ID_MAX_LEN) -> dict:
result_json = self.to_openai_dict()
search_result_json = {"timestamp": self.created_at, "message": {"content": result_json["content"], "role": result_json["role"]}}
return search_result_json
def to_openai_dict(
self,
max_tool_id_length: int = TOOL_CALL_ID_MAX_LEN,
put_inner_thoughts_in_kwargs: bool = False,
use_developer_message: bool = False,
# if true, then treat the content field as AssistantMessage
native_content: bool = False,
strip_request_heartbeat: bool = False,
tool_return_truncation_chars: Optional[int] = None,
) -> dict | None:
"""Go from Message class to ChatCompletion message object"""
assert not (native_content and put_inner_thoughts_in_kwargs), "native_content and put_inner_thoughts_in_kwargs cannot both be true"
if self.role == "approval" and self.tool_calls is None:
return None
# TODO change to pydantic casting, eg `return SystemMessageModel(self)`
# If we only have one content part and it's text, treat it as COT
parse_content_parts = False
if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent):
text_content = self.content[0].text
elif self.content and len(self.content) == 1 and isinstance(self.content[0], ToolReturnContent):
text_content = self.content[0].content
elif self.content and len(self.content) == 1 and isinstance(self.content[0], ImageContent):
text_content = "[Image Here]"
# Otherwise, check if we have TextContent and multiple other parts
elif self.content and len(self.content) > 1:
text_parts = [content for content in self.content if isinstance(content, TextContent)]
# assert len(text) == 1, f"multiple text content parts found in a single message: {self.content}"
text_content = "\n\n".join([t.text for t in text_parts])
# Summarizer transcripts use this OpenAI-style dict; include a compact image placeholder
image_count = len([c for c in self.content if isinstance(c, ImageContent)])
if image_count > 0:
placeholder = "[Image omitted]" if image_count == 1 else f"[{image_count} images omitted]"
text_content = (text_content + (" " if text_content else "")) + placeholder
parse_content_parts = True
else:
text_content = None
# TODO(caren) we should eventually support multiple content parts here?
# ie, actually make dict['content'] type list
# But for now, it's OK until we support multi-modal,
# since the only "parts" we have are for supporting various COT
if self.role == "system":
openai_message = {
"content": text_content,
"role": "developer" if use_developer_message else self.role,
}
elif self.role == "user":
assert text_content is not None, vars(self)
openai_message = {
"content": text_content,
"role": self.role,
}
elif self.role == "summary":
# Summary messages are converted to user messages (same as current system_alert behavior)
assert text_content is not None, vars(self)
openai_message = {
"content": text_content,
"role": "user",
}
elif self.role == "assistant" or self.role == "approval":
try:
assert self.tool_calls is not None or text_content is not None, vars(self)
except AssertionError as e:
# relax check if this message only contains reasoning content
if self.content is not None and len(self.content) > 0:
# Check if all non-empty content is reasoning-related
all_reasoning = all(
isinstance(c, (ReasoningContent, SummarizedReasoningContent, OmittedReasoningContent, RedactedReasoningContent))
for c in self.content
)
if all_reasoning:
return None
raise e
# if native content, then put it directly inside the content
if native_content:
openai_message = {
# TODO support listed content (if it's possible for role assistant?)
# "content": self.content,
"content": text_content, # here content is not reasoning, it's assistant message
"role": "assistant",
}
# otherwise, if inner_thoughts_in_kwargs, hold it for the tool calls
else:
openai_message = {
"content": None if (put_inner_thoughts_in_kwargs and self.tool_calls is not None) else text_content,
"role": "assistant",
}
if self.tool_calls is not None:
if put_inner_thoughts_in_kwargs:
# put the inner thoughts inside the tool call before casting to a dict
openai_message["tool_calls"] = [
add_inner_thoughts_to_tool_call(
tool_call,
inner_thoughts=text_content,
inner_thoughts_key=INNER_THOUGHTS_KWARG,
).model_dump()
for tool_call in self.tool_calls
]
else:
openai_message["tool_calls"] = [tool_call.model_dump() for tool_call in self.tool_calls]
if strip_request_heartbeat:
for tool_call_dict in openai_message["tool_calls"]:
tool_call_dict.pop(REQUEST_HEARTBEAT_PARAM, None)
if max_tool_id_length:
for tool_call_dict in openai_message["tool_calls"]:
tool_call_dict["id"] = tool_call_dict["id"][:max_tool_id_length]
elif self.role == "tool":
# Handle tool returns - if tool_returns exists, use the first one
if self.tool_returns and len(self.tool_returns) > 0:
tool_return = self.tool_returns[0]
if not tool_return.tool_call_id:
raise TypeError("OpenAI API requires tool_call_id to be set.")
# Convert to text first (replaces images with placeholders), then truncate
func_response_text = tool_return_to_text(tool_return.func_response)
func_response = truncate_tool_return(func_response_text, tool_return_truncation_chars)
openai_message = {
"content": func_response,
"role": self.role,
"tool_call_id": tool_return.tool_call_id[:max_tool_id_length] if max_tool_id_length else tool_return.tool_call_id,
}
else:
# Legacy fallback for old message format
assert self.tool_call_id is not None, vars(self)
legacy_content = truncate_tool_return(text_content, tool_return_truncation_chars)
openai_message = {
"content": legacy_content,
"role": self.role,
"tool_call_id": self.tool_call_id[:max_tool_id_length] if max_tool_id_length else self.tool_call_id,
}
else:
raise ValueError(self.role)
# Optional field, do not include if null or invalid
if self.name is not None:
if bool(re.match(r"^[^\s<|\\/>]+$", self.name)):
openai_message["name"] = self.name
else:
logger.warning(f"Using OpenAI with invalid 'name' field (name={self.name} role={self.role}).")
if parse_content_parts and self.content is not None:
for content in self.content:
if isinstance(content, ReasoningContent):
openai_message["reasoning_content"] = content.reasoning
if content.signature:
openai_message["reasoning_content_signature"] = content.signature
if isinstance(content, RedactedReasoningContent):
openai_message["redacted_reasoning_content"] = content.data
return openai_message
@staticmethod
def to_openai_dicts_from_list(
messages: List[Message],
max_tool_id_length: int = TOOL_CALL_ID_MAX_LEN,
put_inner_thoughts_in_kwargs: bool = False,
use_developer_message: bool = False,
tool_return_truncation_chars: Optional[int] = None,
) -> List[dict]:
messages = Message.filter_messages_for_llm_api(messages)
result: List[dict] = []
for m in messages:
# Special case: OpenAI Chat Completions requires a separate tool message per tool_call_id
# If we have multiple explicit tool_returns on a single Message, expand into one dict per return
if m.role == MessageRole.tool and m.tool_returns and len(m.tool_returns) > 0:
for tr in m.tool_returns:
if not tr.tool_call_id:
raise TypeError("ToolReturn came back without a tool_call_id.")
# Convert multi-modal to text (images → placeholders), then truncate
func_response_text = tool_return_to_text(tr.func_response)
func_response = truncate_tool_return(func_response_text, tool_return_truncation_chars)
result.append(
{
"content": func_response,
"role": "tool",
"tool_call_id": tr.tool_call_id[:max_tool_id_length] if max_tool_id_length else tr.tool_call_id,
}
)
continue
d = m.to_openai_dict(
max_tool_id_length=max_tool_id_length,
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
use_developer_message=use_developer_message,
tool_return_truncation_chars=tool_return_truncation_chars,
)
if d is not None:
result.append(d)
return result
def to_openai_responses_dicts(
self,
max_tool_id_length: int = TOOL_CALL_ID_MAX_LEN,
tool_return_truncation_chars: Optional[int] = None,
) -> List[dict]:
"""Go from Message class to ChatCompletion message object"""
if self.role == "approval" and self.tool_calls is None:
return []
message_dicts = []
if self.role == "system":
text_parts = [c.text for c in (self.content or []) if isinstance(c, TextContent)]
if not text_parts:
logger.warning(
f"System message {self.id} has no text content, skipping: roles={[type(c).__name__ for c in (self.content or [])]}"
)
return message_dicts
system_text = "\n\n".join(text_parts)
message_dicts.append(
{
"role": "developer",
"content": system_text,
}
)
elif self.role == "user":
assert self.content, vars(self)
assert all([isinstance(c, TextContent) or isinstance(c, ImageContent) for c in self.content]), vars(self)
user_dict = {
"role": self.role.value if hasattr(self.role, "value") else self.role,
"content": self._build_responses_user_content(),
}
# Optional field, do not include if null or invalid
if self.name is not None:
if bool(re.match(r"^[^\s<|\\/>]+$", self.name)):
user_dict["name"] = self.name
else:
logger.warning(f"Using OpenAI with invalid 'name' field (name={self.name} role={self.role}).")
message_dicts.append(user_dict)
elif self.role == "summary":
# Summary messages are converted to user messages (same as current system_alert behavior)
assert self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent), vars(self)
message_dicts.append(
{
"role": "user",
"content": self.content[0].text,
}
)
elif self.role == "assistant" or self.role == "approval":
# Validate that message has content OpenAI Responses API can process
if self.tool_calls is None and (self.content is None or len(self.content) == 0):
# Skip this message (similar to Anthropic handling at line 1308)
return message_dicts
# A few things may be in here, firstly reasoning content, secondly assistant messages, thirdly tool calls
# TODO check if OpenAI Responses is capable of R->A->T like Anthropic?
if self.content is not None:
for content_part in self.content:
if isinstance(content_part, SummarizedReasoningContent):
message_dicts.append(
{
"type": "reasoning",
"id": content_part.id,
"summary": [{"type": "summary_text", "text": s.text} for s in content_part.summary],
"encrypted_content": content_part.encrypted_content,
}
)
elif isinstance(content_part, TextContent):
message_dicts.append(
{
"role": "assistant",
"content": content_part.text,
}
)
# else skip
if self.tool_calls is not None:
for tool_call in self.tool_calls:
message_dicts.append(
{
"type": "function_call",
"call_id": tool_call.id[:max_tool_id_length] if max_tool_id_length else tool_call.id,
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
"status": "completed", # TODO check if needed?
}
)
elif self.role == "tool":
# Handle tool returns - supports images via content arrays
if self.tool_returns:
for tool_return in self.tool_returns:
if not tool_return.tool_call_id:
raise TypeError("OpenAI Responses API requires tool_call_id to be set.")
output = self._tool_return_to_responses_output(tool_return.func_response, tool_return_truncation_chars)
message_dicts.append(
{
"type": "function_call_output",
"call_id": tool_return.tool_call_id[:max_tool_id_length] if max_tool_id_length else tool_return.tool_call_id,
"output": output,
}
)
else:
# Legacy fallback for old message format
assert self.tool_call_id is not None, vars(self)
assert len(self.content) == 1 and isinstance(self.content[0], TextContent), vars(self)
legacy_output = truncate_tool_return(self.content[0].text, tool_return_truncation_chars)
message_dicts.append(
{
"type": "function_call_output",
"call_id": self.tool_call_id[:max_tool_id_length] if max_tool_id_length else self.tool_call_id,
"output": legacy_output,
}
)
else:
raise ValueError(self.role)
return message_dicts
def _build_responses_user_content(self) -> List[dict]:
content_parts: List[dict] = []
for content in self.content or []:
if isinstance(content, TextContent):
content_parts.append({"type": "input_text", "text": content.text})
elif isinstance(content, ImageContent):
image_part = self._image_content_to_responses_part(content)
if image_part:
content_parts.append(image_part)
if not content_parts:
content_parts.append({"type": "input_text", "text": ""})
return content_parts
@staticmethod
def _image_content_to_responses_part(image_content: ImageContent) -> Optional[dict]:
image_url = Message._image_source_to_data_url(image_content)
if not image_url:
return None
detail = getattr(image_content.source, "detail", None) or "auto"
return {"type": "input_image", "image_url": image_url, "detail": detail}
@staticmethod
def _image_source_to_data_url(image_content: ImageContent) -> Optional[str]:
source = image_content.source
if source.type == ImageSourceType.base64:
data = getattr(source, "data", None)
if not data:
return None
media_type = getattr(source, "media_type", None) or "image/png"
return f"data:{media_type};base64,{data}"
if source.type == ImageSourceType.url:
return getattr(source, "url", None)
if source.type == ImageSourceType.letta:
data = getattr(source, "data", None)
if not data:
return None
media_type = getattr(source, "media_type", None) or "image/png"
return f"data:{media_type};base64,{data}"
return None
@staticmethod
def _image_dict_to_data_url(part: dict) -> Optional[str]:
"""Convert image dict to data URL."""
source = part.get("source", {})
if source.get("type") == "base64" and source.get("data"):
media_type = source.get("media_type", "image/png")
return f"data:{media_type};base64,{source['data']}"
elif source.get("type") == "url":
return source.get("url")
return None
@staticmethod
def _tool_return_to_responses_output(
func_response: Optional[Union[str, List]],
tool_return_truncation_chars: Optional[int] = None,
) -> Union[str, List[dict]]:
"""Convert tool return to OpenAI Responses API format."""
if func_response is None:
return ""
if isinstance(func_response, str):
return truncate_tool_return(func_response, tool_return_truncation_chars) or ""
output_parts: List[dict] = []
for part in func_response:
if isinstance(part, TextContent):
text = truncate_tool_return(part.text, tool_return_truncation_chars) or ""
output_parts.append({"type": "input_text", "text": text})
elif isinstance(part, ImageContent):
image_url = Message._image_source_to_data_url(part)
if image_url:
detail = getattr(part.source, "detail", None) or "auto"
output_parts.append({"type": "input_image", "image_url": image_url, "detail": detail})
elif isinstance(part, dict):
if part.get("type") == "text":
text = truncate_tool_return(part.get("text", ""), tool_return_truncation_chars) or ""
output_parts.append({"type": "input_text", "text": text})
elif part.get("type") == "image":
image_url = Message._image_dict_to_data_url(part)
if image_url:
detail = part.get("source", {}).get("detail", "auto")
output_parts.append({"type": "input_image", "image_url": image_url, "detail": detail})
return output_parts if output_parts else ""
@staticmethod
def to_openai_responses_dicts_from_list(
messages: List[Message],
max_tool_id_length: int = TOOL_CALL_ID_MAX_LEN,
tool_return_truncation_chars: Optional[int] = None,
) -> List[dict]:
messages = Message.filter_messages_for_llm_api(messages)
result = []
for message in messages:
result.extend(
message.to_openai_responses_dicts(
max_tool_id_length=max_tool_id_length, tool_return_truncation_chars=tool_return_truncation_chars
)
)
return result
@staticmethod
def _get_base64_image_data(part: Union[ImageContent, dict]) -> Optional[tuple[str, str]]:
"""Extract base64 data and media type from ImageContent or dict."""
if isinstance(part, ImageContent):
source = part.source
if source.type == ImageSourceType.base64:
return source.data, source.media_type
elif source.type == ImageSourceType.letta and getattr(source, "data", None):
return source.data, getattr(source, "media_type", None) or "image/png"
elif isinstance(part, dict) and part.get("type") == "image":
source = part.get("source", {})
if source.get("type") == "base64" and source.get("data"):
return source["data"], source.get("media_type", "image/png")
return None
@staticmethod
def _tool_return_to_google_parts(
func_response: Optional[Union[str, List]],
tool_return_truncation_chars: Optional[int] = None,
) -> tuple[str, List[dict]]:
"""Extract text and image parts for Google API format."""
if isinstance(func_response, str):
return truncate_tool_return(func_response, tool_return_truncation_chars) or "", []
text_parts = []
image_parts = []
for part in func_response:
if text := _get_text_from_part(part):
text_parts.append(text)
elif image_data := Message._get_base64_image_data(part):
data, media_type = image_data
image_parts.append({"inlineData": {"data": data, "mimeType": media_type}})
text = truncate_tool_return("\n".join(text_parts), tool_return_truncation_chars) or ""
if image_parts:
suffix = f"[{len(image_parts)} image(s) attached]"
text = f"{text}\n{suffix}" if text else suffix
return text, image_parts
@staticmethod
def _tool_return_to_anthropic_content(
func_response: Optional[Union[str, List]],
tool_return_truncation_chars: Optional[int] = None,
) -> Union[str, List[dict]]:
"""Convert tool return to Anthropic tool_result content format."""
if func_response is None:
return ""
if isinstance(func_response, str):
return truncate_tool_return(func_response, tool_return_truncation_chars) or ""
content: List[dict] = []
for part in func_response:
if text := _get_text_from_part(part):
text = truncate_tool_return(text, tool_return_truncation_chars) or ""
content.append({"type": "text", "text": text})
elif image_data := Message._get_base64_image_data(part):
data, media_type = image_data
content.append({"type": "image", "source": {"type": "base64", "data": data, "media_type": media_type}})
return content if content else ""
def to_anthropic_dict(
self,
current_model: str,
inner_thoughts_xml_tag="thinking",
put_inner_thoughts_in_kwargs: bool = False,
# if true, then treat the content field as AssistantMessage
native_content: bool = False,
strip_request_heartbeat: bool = False,
tool_return_truncation_chars: Optional[int] = None,
) -> dict | None:
"""
Convert to an Anthropic message dictionary
Args:
inner_thoughts_xml_tag (str): The XML tag to wrap around inner thoughts
"""
assert not (native_content and put_inner_thoughts_in_kwargs), "native_content and put_inner_thoughts_in_kwargs cannot both be true"
if self.role == "approval" and self.tool_calls is None:
return None
# Check for COT
if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent):
text_content = self.content[0].text
else:
text_content = None
def add_xml_tag(string: str, xml_tag: Optional[str]):
# NOTE: Anthropic docs recommends using <thinking> tag when using CoT + tool use
if f"<{xml_tag}>" in string and f"</{xml_tag}>" in string:
# don't nest if tags already exist
return string
return f"<{xml_tag}>{string}</{xml_tag}" if xml_tag else string
if self.role == "system":
# NOTE: this is not for system instructions, but instead system "events"
system_text = text_content
if system_text is None:
text_parts = [c.text for c in (self.content or []) if isinstance(c, TextContent)]
if not text_parts:
from letta.log import get_logger as _get_logger
_get_logger(__name__).warning(
f"System message {self.id} has no text content, skipping: roles={[type(c).__name__ for c in (self.content or [])]}"
)
return None
system_text = "\n\n".join(text_parts)
# Two options here, we would use system.package_system_message,
# or use a more Anthropic-specific packaging ie xml tags
user_system_event = add_xml_tag(string=f"SYSTEM ALERT: {system_text}", xml_tag="event")
anthropic_message = {
"content": user_system_event,
"role": "user",
}
elif self.role == "user":
# special case for text-only message
if text_content is not None:
anthropic_message = {
"content": text_content,
"role": self.role,
}
else:
content_parts = []
for content in self.content:
if isinstance(content, TextContent):
content_parts.append({"type": "text", "text": content.text})
elif isinstance(content, ImageContent):
content_parts.append(
{
"type": "image",
"source": {
"type": "base64",
"data": content.source.data,
"media_type": content.source.media_type,
},
}
)
else:
raise ValueError(f"Unsupported content type: {content.type}")
anthropic_message = {
"content": content_parts,
"role": self.role,
}
elif self.role == "summary":
# Summary messages are converted to user messages (same as current system_alert behavior)
assert text_content is not None, vars(self)
anthropic_message = {
"content": text_content,
"role": "user",
}
elif self.role == "assistant" or self.role == "approval":
# Validate that message has content Anthropic API can process
if self.tool_calls is None and (self.content is None or len(self.content) == 0):
# Skip this message (consistent with OpenAI dict handling)
return None
anthropic_message = {
"role": "assistant",
}
content = []
if native_content:
# No special handling for TextContent
if self.content is not None:
for content_part in self.content:
# TextContent, ImageContent, ToolCallContent, ToolReturnContent, ReasoningContent, RedactedReasoningContent, OmittedReasoningContent
if isinstance(content_part, ReasoningContent):
if current_model == self.model:
block = {
"type": "thinking",
"thinking": content_part.reasoning,
}
if content_part.signature:
block["signature"] = content_part.signature
content.append(block)
elif isinstance(content_part, RedactedReasoningContent):
if current_model == self.model:
content.append(
{
"type": "redacted_thinking",
"data": content_part.data,
}
)
elif isinstance(content_part, TextContent):
content.append(
{
"type": "text",
"text": content_part.text,
}
)
else:
# Skip unsupported types eg OmmitedReasoningContent
pass
else:
# COT / reasoning / thinking
if self.content is not None and len(self.content) >= 1:
for content_part in self.content:
if isinstance(content_part, ReasoningContent):
if current_model == self.model:
block = {
"type": "thinking",
"thinking": content_part.reasoning,
}
if content_part.signature:
block["signature"] = content_part.signature
content.append(block)
if isinstance(content_part, RedactedReasoningContent):
if current_model == self.model:
content.append(
{
"type": "redacted_thinking",
"data": content_part.data,
}
)
if isinstance(content_part, TextContent):
content.append(
{
"type": "text",
"text": content_part.text,
}
)
elif text_content is not None:
content.append(
{
"type": "text",
"text": add_xml_tag(string=text_content, xml_tag=inner_thoughts_xml_tag),
}
)
# Tool calling
if self.tool_calls is not None:
for tool_call in self.tool_calls:
if put_inner_thoughts_in_kwargs:
tool_call_input = add_inner_thoughts_to_tool_call(
tool_call,
inner_thoughts=text_content,
inner_thoughts_key=INNER_THOUGHTS_KWARG,
).model_dump()
else:
tool_call_input = parse_json(tool_call.function.arguments)
if strip_request_heartbeat:
tool_call_input.pop(REQUEST_HEARTBEAT_PARAM, None)
content.append(
{
"type": "tool_use",
"id": sanitize_tool_call_id(tool_call.id),
"name": tool_call.function.name,
"input": tool_call_input,
}
)
anthropic_message["content"] = content
elif self.role == "tool":
# NOTE: Anthropic uses role "user" for "tool" responses
content = []
# Handle the case where tool_returns is None or empty
if self.tool_returns:
# For single tool returns, we can use the message's tool_call_id as fallback
# since self.tool_call_id == tool_returns[0].tool_call_id for legacy compatibility.
# For multiple tool returns (parallel tool calls), each must have its own ID
# to correctly map results to their corresponding tool invocations.
use_message_fallback = len(self.tool_returns) == 1
for idx, tool_return in enumerate(self.tool_returns):
# Get tool_call_id from tool_return; only use message fallback for single returns
resolved_tool_call_id = tool_return.tool_call_id
if not resolved_tool_call_id and use_message_fallback:
resolved_tool_call_id = self.tool_call_id
if not resolved_tool_call_id:
from letta.log import get_logger
logger = get_logger(__name__)
logger.error(
f"Missing tool_call_id in tool return and no fallback available. "
f"Message ID: {self.id}, "
f"Tool name: {self.name or 'unknown'}, "
f"Tool return index: {idx}/{len(self.tool_returns)}, "
f"Tool return status: {tool_return.status}"
)
raise TypeError(
f"Anthropic API requires tool_use_id to be set. "
f"Message ID: {self.id}, Tool: {self.name or 'unknown'}, "
f"Tool return index: {idx}/{len(self.tool_returns)}"
)
# Convert to Anthropic format (supports images)
tool_result_content = self._tool_return_to_anthropic_content(tool_return.func_response, tool_return_truncation_chars)
content.append(
{
"type": "tool_result",
"tool_use_id": sanitize_tool_call_id(resolved_tool_call_id),
"content": tool_result_content,
}
)
if content:
anthropic_message = {
"role": "user",
"content": content,
}
else:
if not self.tool_call_id:
raise TypeError("Anthropic API requires tool_use_id to be set.")
# This is for legacy reasons
legacy_content = truncate_tool_return(text_content, tool_return_truncation_chars)
anthropic_message = {
"role": "user", # NOTE: diff
"content": [
# TODO support error types etc
{
"type": "tool_result",
"tool_use_id": self.tool_call_id,
"content": legacy_content,
}
],
}
else:
raise ValueError(self.role)
return anthropic_message
@staticmethod
def to_anthropic_dicts_from_list(
messages: List[Message],
current_model: str,
inner_thoughts_xml_tag: str = "thinking",
put_inner_thoughts_in_kwargs: bool = False,
# if true, then treat the content field as AssistantMessage
native_content: bool = False,
strip_request_heartbeat: bool = False,
tool_return_truncation_chars: Optional[int] = None,
) -> List[dict]:
messages = Message.filter_messages_for_llm_api(messages)
result = [
m.to_anthropic_dict(
current_model=current_model,
inner_thoughts_xml_tag=inner_thoughts_xml_tag,
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
native_content=native_content,
strip_request_heartbeat=strip_request_heartbeat,
tool_return_truncation_chars=tool_return_truncation_chars,
)
for m in messages
]
result = [m for m in result if m is not None]
return result
def to_google_dict(
self,
current_model: str,
put_inner_thoughts_in_kwargs: bool = True,
# if true, then treat the content field as AssistantMessage
native_content: bool = False,
strip_request_heartbeat: bool = False,
tool_return_truncation_chars: Optional[int] = None,
) -> dict | None:
"""
Go from Message class to Google AI REST message object
"""
assert not (native_content and put_inner_thoughts_in_kwargs), "native_content and put_inner_thoughts_in_kwargs cannot both be true"
if self.role == "approval" and self.tool_calls is None:
return None
# type Content: https://ai.google.dev/api/rest/v1/Content / https://ai.google.dev/api/rest/v1beta/Content
# parts[]: Part
# role: str ('user' or 'model')
if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent):
text_content = self.content[0].text
elif self.content and len(self.content) == 1 and isinstance(self.content[0], ToolReturnContent):
text_content = self.content[0].content
else:
text_content = None
if self.role != "tool" and self.name is not None:
logger.warning(f"Using Google AI with non-null 'name' field (name={self.name} role={self.role}), not yet supported.")
if self.role == "system":
# NOTE: Gemini API doesn't have a 'system' role, use 'user' instead
# https://www.reddit.com/r/Bard/comments/1b90i8o/does_gemini_have_a_system_prompt_option_while/
google_ai_message = {
"role": "user", # NOTE: no 'system'
"parts": [{"text": text_content}],
}
elif self.role == "user":
assert self.content, vars(self)
content_parts = []
for content in self.content:
if isinstance(content, TextContent):
content_parts.append({"text": content.text})
elif isinstance(content, ImageContent):
content_parts.append(
{
"inline_data": {
"data": content.source.data,
"mime_type": content.source.media_type,
}
}
)
else:
raise ValueError(f"Unsupported content type: {content.type}")
google_ai_message = {
"role": "user",
"parts": content_parts,
}
elif self.role == "summary":
# Summary messages are converted to user messages (same as current system_alert behavior)
assert text_content is not None, vars(self)
google_ai_message = {
"role": "user",
"parts": [{"text": text_content}],
}
elif self.role == "assistant" or self.role == "approval":
# Validate that message has content Google API can process
if self.tool_calls is None and text_content is None and len(self.content) <= 1:
# Message has no tool calls, no extractable text, and not multi-part
logger.warning(
f"Assistant/approval message {self.id} has no content Google API can convert: "
f"tool_calls={self.tool_calls}, text_content={text_content}, content={self.content}"
)
# Return None to skip this message (similar to approval messages without tool_calls at line 1998)
return None
google_ai_message = {
"role": "model", # NOTE: different
}
# NOTE: Google AI API doesn't allow non-null content + function call
# To get around this, just two a two part message, inner thoughts first then
parts = []
if native_content and text_content is not None:
# TODO support multi-part assistant content
parts.append({"text": text_content})
elif not put_inner_thoughts_in_kwargs and text_content is not None:
# NOTE: ideally we do multi-part for CoT / inner thoughts + function call, but Google AI API doesn't allow it
raise NotImplementedError
parts.append({"text": text_content})
if self.tool_calls is not None:
# Check if there's a signature in the content that should be included with function calls
# Google Vertex/Gemini 3 requires thought_signature to be echoed back in function calls
# Per Google docs: https://ai.google.dev/gemini-api/docs/thought-signatures
# - For parallel function calls, only the FIRST functionCall should have the signature
# - For sequential function calls (multi-step), each function call has its own signature
thought_signature = None
# Allow signatures when models match OR when self.model is None (backwards compatibility
# for older messages that may not have had their model field set)
models_compatible = self.model is None or current_model == self.model
if self.content and models_compatible:
for content in self.content:
# Check for signature in ReasoningContent, TextContent, or ToolCallContent
# Take the first non-None signature found (don't keep overwriting)
if isinstance(content, (ReasoningContent, TextContent, ToolCallContent)):
sig = getattr(content, "signature", None)
if sig is not None and thought_signature is None:
thought_signature = sig
# NOTE: implied support for multiple calls
is_first_function_call = True
for tool_call in self.tool_calls:
function_name = tool_call.function.name
function_args = tool_call.function.arguments
try:
# NOTE: Google AI wants actual JSON objects, not strings
function_args = parse_json(function_args)
except:
raise UserWarning(f"Failed to parse JSON function args: {function_args}")
function_args = {"args": function_args}
if put_inner_thoughts_in_kwargs and text_content is not None:
assert INNER_THOUGHTS_KWARG not in function_args, function_args
assert len(self.tool_calls) == 1
function_args[INNER_THOUGHTS_KWARG_VERTEX] = text_content
if strip_request_heartbeat:
function_args.pop(REQUEST_HEARTBEAT_PARAM, None)
# Build the function call part
function_call_part = {
"functionCall": {
"name": function_name,
"args": function_args,
}
}
# Include thought_signature only on the FIRST function call
# Per Google docs, for parallel function calls, only the first gets the signature
if thought_signature is not None and is_first_function_call:
function_call_part["thought_signature"] = thought_signature
is_first_function_call = False
parts.append(function_call_part)
else:
# Only add single text_content if we don't have multiple content items
# (multi-content case is handled below at the len(self.content) > 1 block)
if not native_content and not (self.content and len(self.content) > 1):
assert text_content is not None
parts.append({"text": text_content})
if self.content and len(self.content) > 1:
# Use the same models_compatible check defined above for consistency
# Allow signatures when models match OR when self.model is None (backwards compatibility)
models_compatible = self.model is None or current_model == self.model
native_google_content_parts = []
# Track if we've seen the first function call (for parallel tool calls)
seen_first_function_call = False
for content in self.content:
if isinstance(content, TextContent):
native_part = {"text": content.text}
if content.signature and models_compatible:
native_part["thought_signature"] = content.signature
native_google_content_parts.append(native_part)
elif isinstance(content, ReasoningContent):
if models_compatible:
native_google_content_parts.append({"text": content.reasoning, "thought": True})
elif isinstance(content, ToolCallContent):
native_part = {
"function_call": {
"name": content.name,
"args": content.input,
},
}
# Only include signature on the FIRST function call (for parallel tool calls)
# Per Google docs: https://ai.google.dev/gemini-api/docs/thought-signatures
if content.signature and models_compatible and not seen_first_function_call:
native_part["thought_signature"] = content.signature
seen_first_function_call = True
native_google_content_parts.append(native_part)
else:
# silently drop other content types
pass
if native_google_content_parts:
parts = native_google_content_parts
google_ai_message["parts"] = parts
elif self.role == "tool":
# NOTE: Significantly different tool calling format, more similar to function calling format
# Handle tool returns - Google supports images as sibling inlineData parts
if self.tool_returns:
parts = []
for tool_return in self.tool_returns:
if not tool_return.tool_call_id:
raise TypeError("Google AI API requires tool_call_id to be set.")
# Use the function name if available, otherwise use tool_call_id
function_name = self.name if self.name else tool_return.tool_call_id
text_content, image_parts = Message._tool_return_to_google_parts(
tool_return.func_response, tool_return_truncation_chars
)
try:
function_response = parse_json(text_content)
except:
function_response = {"function_response": text_content}
parts.append(
{
"functionResponse": {
"name": function_name,
"response": {"name": function_name, "content": function_response},
}
}
)
parts.extend(image_parts)
google_ai_message = {
"role": "function",
"parts": parts,
}
else:
# Legacy fallback for old message format
assert self.tool_call_id is not None, vars(self)
if self.name is None:
logger.warning("Couldn't find function name on tool call, defaulting to tool ID instead.")
function_name = self.tool_call_id
else:
function_name = self.name
# Truncate the legacy content if needed
legacy_content = truncate_tool_return(text_content, tool_return_truncation_chars)
# NOTE: Google AI API wants the function response as JSON only, no string
try:
function_response = parse_json(legacy_content)
except:
function_response = {"function_response": legacy_content}
google_ai_message = {
"role": "function",
"parts": [
{
"functionResponse": {
"name": function_name,
"response": {
"name": function_name, # NOTE: name twice... why?
"content": function_response,
},
}
}
],
}
else:
raise ValueError(self.role)
# Validate that parts is never empty before returning
if "parts" not in google_ai_message or not google_ai_message["parts"]:
# If parts is empty, add a default text part
google_ai_message["parts"] = [{"text": "empty message"}]
logger.warning(
f"Empty 'parts' detected in message with role '{self.role}'. Added default empty text part. Full message:\n{vars(self)}"
)
return google_ai_message
@staticmethod
def to_google_dicts_from_list(
messages: List[Message],
current_model: str,
put_inner_thoughts_in_kwargs: bool = True,
native_content: bool = False,
tool_return_truncation_chars: Optional[int] = None,
):
messages = Message.filter_messages_for_llm_api(messages)
result = [
m.to_google_dict(
current_model=current_model,
put_inner_thoughts_in_kwargs=put_inner_thoughts_in_kwargs,
native_content=native_content,
tool_return_truncation_chars=tool_return_truncation_chars,
)
for m in messages
]
result = [m for m in result if m is not None]
return result
def is_approval_request(self) -> bool:
return self.role == "approval" and self.tool_calls is not None and len(self.tool_calls) > 0
def is_approval_response(self) -> bool:
return self.role == "approval" and self.tool_calls is None and self.approve is not None
def is_summarization_message(self) -> bool:
# First-class summary role (new format)
if self.role == "summary":
return True
# Legacy format: user message with system_alert content
return (
self.role == "user"
and self.content is not None
and len(self.content) == 1
and isinstance(self.content[0], TextContent)
and "system_alert" in self.content[0].text
)
@staticmethod
def filter_messages_for_llm_api(
messages: List[Message],
) -> List[Message]:
messages = [m for m in messages if m is not None]
if len(messages) == 0:
return []
# Add special handling for legacy bug where summarization triggers in the middle of hitl
messages_to_filter = []
for i in range(len(messages) - 1):
first_message_is_approval = messages[i].is_approval_request()
second_message_is_summary = messages[i + 1].is_summarization_message()
third_message_is_optional_approval = i + 2 >= len(messages) or messages[i + 2].is_approval_response()
if first_message_is_approval and second_message_is_summary and third_message_is_optional_approval:
messages_to_filter.append(messages[i])
for idx in reversed(messages_to_filter): # reverse to avoid index shift
messages.remove(idx)
# Filter last message if it is a lone approval request without a response - this only occurs for token counting
if messages[-1].role == "approval" and messages[-1].tool_calls is not None and len(messages[-1].tool_calls) > 0:
messages.remove(messages[-1])
# Also filter pending tool call message if this turn invoked parallel tool calling
if messages and messages[-1].role == "assistant" and messages[-1].tool_calls is not None and len(messages[-1].tool_calls) > 0:
messages.remove(messages[-1])
# Filter last message if it is a lone reasoning message without assistant message or tool call
if (
messages[-1].role == "assistant"
and messages[-1].tool_calls is None
and (not messages[-1].content or all(not isinstance(content_part, TextContent) for content_part in messages[-1].content))
):
messages.remove(messages[-1])
# Collapse adjacent tool call and approval messages
messages = Message.collapse_tool_call_messages_for_llm_api(messages)
# Dedupe duplicate tool-return payloads across tool messages so downstream providers
# never see the same tool_call_id's result twice in a single request
messages = Message.dedupe_tool_messages_for_llm_api(messages)
# Dedupe duplicate tool calls within assistant messages so a single assistant message
# cannot emit multiple tool_use blocks with the same id (Anthropic requirement)
messages = Message.dedupe_tool_calls_for_llm_api(messages)
return messages
@staticmethod
def collapse_tool_call_messages_for_llm_api(
messages: List[Message],
) -> List[Message]:
adjacent_tool_call_approval_messages = []
for i in range(len(messages) - 1):
if (
messages[i].role == MessageRole.assistant
and messages[i].tool_calls is not None
and messages[i + 1].role == MessageRole.approval
and messages[i + 1].tool_calls is not None
):
adjacent_tool_call_approval_messages.append(i)
for i in reversed(adjacent_tool_call_approval_messages):
messages[i].content = messages[i].content + messages[i + 1].content
messages[i].tool_calls = messages[i].tool_calls + messages[i + 1].tool_calls
messages.remove(messages[i + 1])
return messages
@staticmethod
def dedupe_tool_messages_for_llm_api(messages: List[Message]) -> List[Message]:
"""Dedupe duplicate tool returns across tool-role messages by tool_call_id.
- For explicit tool_returns arrays: keep the first occurrence of each tool_call_id,
drop subsequent duplicates within the request.
- For legacy single tool_call_id + content messages: keep the first, drop duplicates.
- If a tool message has neither unique tool_returns nor content, drop it.
This runs prior to provider-specific formatting to reduce duplicate tool_result blocks downstream.
"""
if not messages:
return messages
from letta.log import get_logger
logger = get_logger(__name__)
seen_ids: set[str] = set()
removed_tool_msgs = 0
removed_tool_returns = 0
result: List[Message] = []
for m in messages:
if m.role != MessageRole.tool:
result.append(m)
continue
# Prefer explicit tool_returns when present
if m.tool_returns and len(m.tool_returns) > 0:
unique_returns = []
for tr in m.tool_returns:
tcid = getattr(tr, "tool_call_id", None)
if tcid and tcid in seen_ids:
removed_tool_returns += 1
continue
if tcid:
seen_ids.add(tcid)
unique_returns.append(tr)
if unique_returns:
# Replace with unique set; keep message
m.tool_returns = unique_returns
result.append(m)
else:
# No unique returns left; if legacy content exists, fall back to legacy handling below
if m.tool_call_id and m.content and len(m.content) > 0:
tcid = m.tool_call_id
if tcid in seen_ids:
removed_tool_msgs += 1
continue
seen_ids.add(tcid)
result.append(m)
else:
removed_tool_msgs += 1
continue
else:
# Legacy single-response path
tcid = getattr(m, "tool_call_id", None)
if tcid:
if tcid in seen_ids:
removed_tool_msgs += 1
continue
seen_ids.add(tcid)
result.append(m)
if removed_tool_msgs or removed_tool_returns:
logger.error(
"[Message] Deduped duplicate tool messages for request: removed_messages=%d, removed_returns=%d",
removed_tool_msgs,
removed_tool_returns,
)
return result
@staticmethod
def dedupe_tool_calls_for_llm_api(messages: List[Message]) -> List[Message]:
"""Ensure each assistant message contains unique tool_calls by id.
Anthropic requires tool_use ids to be unique within a single assistant message. When
collapsing adjacent assistant/approval messages, duplicates can sneak in. This pass keeps
the first occurrence per id and drops subsequent duplicates.
"""
if not messages:
return messages
from letta.log import get_logger
logger = get_logger(__name__)
removed_counts_total = 0
for m in messages:
if m.role != MessageRole.assistant or not m.tool_calls:
continue
seen: set[str] = set()
unique_tool_calls = []
removed = 0
for tc in m.tool_calls:
tcid = getattr(tc, "id", None)
if tcid and tcid in seen:
removed += 1
continue
if tcid:
seen.add(tcid)
unique_tool_calls.append(tc)
if removed:
m.tool_calls = unique_tool_calls
removed_counts_total += removed
if removed_counts_total:
logger.error("[Message] Deduped duplicate tool_calls in assistant messages: removed=%d", removed_counts_total)
return messages
@staticmethod
def generate_otid_from_id(message_id: str, index: int) -> str:
"""
Convert message id to bits and change the list bit to the index
"""
if index == -1:
return message_id
if not 0 <= index < 128:
raise ValueError("Index must be between 0 and 127")
message_uuid = message_id.replace("message-", "")
uuid_int = int(message_uuid.replace("-", ""), 16)
# Clear last 7 bits and set them to index; supports up to 128 unique indices
uuid_int = (uuid_int & ~0x7F) | (index & 0x7F)
hex_str = f"{uuid_int:032x}"
return f"{hex_str[:8]}-{hex_str[8:12]}-{hex_str[12:16]}-{hex_str[16:20]}-{hex_str[20:]}"
class ToolReturn(BaseModel):
tool_call_id: Optional[Any] = Field(None, description="The ID for the tool call")
status: Literal["success", "error"] = Field(..., description="The status of the tool call")
stdout: Optional[List[str]] = Field(default=None, description="Captured stdout (e.g. prints, logs) from the tool invocation")
stderr: Optional[List[str]] = Field(default=None, description="Captured stderr from the tool invocation")
func_response: Optional[Union[str, List[LettaToolReturnContentUnion]]] = Field(
None, description="The function response - either a string or list of content parts (text/image)"
)
class MessageSearchRequest(BaseModel):
"""Request model for searching messages across the organization"""
query: Optional[str] = Field(None, description="Text query for full-text search")
search_mode: Literal["vector", "fts", "hybrid"] = Field("hybrid", description="Search mode to use")
roles: Optional[List[MessageRole]] = Field(None, description="Filter messages by role")
agent_id: Optional[str] = Field(None, description="Filter messages by agent ID")
project_id: Optional[str] = Field(None, description="Filter messages by project ID")
template_id: Optional[str] = Field(None, description="Filter messages by template ID")
conversation_id: Optional[str] = Field(None, description="Filter messages by conversation ID")
limit: int = Field(50, description="Maximum number of results to return", ge=1, le=100)
start_date: Optional[datetime] = Field(None, description="Filter messages created after this date")
end_date: Optional[datetime] = Field(None, description="Filter messages created on or before this date")
class SearchAllMessagesRequest(BaseModel):
query: str = Field(..., description="Text query for full-text search")
search_mode: Literal["vector", "fts", "hybrid"] = Field("hybrid", description="Search mode to use")
agent_id: Optional[str] = Field(None, description="Filter messages by agent ID")
conversation_id: Optional[str] = Field(None, description="Filter messages by conversation ID")
limit: int = Field(50, description="Maximum number of results to return", ge=1, le=100)
start_date: Optional[datetime] = Field(None, description="Filter messages created after this date")
end_date: Optional[datetime] = Field(None, description="Filter messages created on or before this date")
class MessageSearchResult(BaseModel):
"""Result from a message search operation with scoring details."""
embedded_text: str = Field(..., description="The embedded content (LLM-friendly)")
message: Message = Field(..., description="The raw message object")
fts_rank: Optional[int] = Field(None, description="Full-text search rank position if FTS was used")
vector_rank: Optional[int] = Field(None, description="Vector search rank position if vector search was used")
rrf_score: float = Field(..., description="Reciprocal Rank Fusion combined score")