feat: add summary message and event on compaction (#9144)

* base

* update

* update

* revert formatting

* routes

* legacy

* fix

* review

* update
This commit is contained in:
jnjpng
2026-01-30 11:59:24 -08:00
committed by Caren Thomas
parent b0f8e16ac0
commit d28ccc0be6
14 changed files with 440 additions and 56 deletions

View File

@@ -7806,8 +7806,8 @@
"assistant_message": "#/components/schemas/AssistantMessage",
"approval_request_message": "#/components/schemas/ApprovalRequestMessage",
"approval_response_message": "#/components/schemas/ApprovalResponseMessage",
"summary": "#/components/schemas/SummaryMessage",
"event": "#/components/schemas/EventMessage"
"summary_message": "#/components/schemas/SummaryMessage",
"event_message": "#/components/schemas/EventMessage"
}
},
"title": "Response Modify Message"
@@ -9965,8 +9965,8 @@
"assistant_message": "#/components/schemas/AssistantMessage",
"approval_request_message": "#/components/schemas/ApprovalRequestMessage",
"approval_response_message": "#/components/schemas/ApprovalResponseMessage",
"summary": "#/components/schemas/SummaryMessage",
"event": "#/components/schemas/EventMessage"
"summary_message": "#/components/schemas/SummaryMessage",
"event_message": "#/components/schemas/EventMessage"
}
},
"title": "Response Modify Group Message"
@@ -16537,8 +16537,8 @@
"assistant_message": "#/components/schemas/AssistantMessage",
"approval_request_message": "#/components/schemas/ApprovalRequestMessage",
"approval_response_message": "#/components/schemas/ApprovalResponseMessage",
"summary": "#/components/schemas/SummaryMessage",
"event": "#/components/schemas/EventMessage"
"summary_message": "#/components/schemas/SummaryMessage",
"event_message": "#/components/schemas/EventMessage"
}
}
},
@@ -30454,6 +30454,12 @@
"title": "Override Model",
"description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration."
},
"include_compaction_messages": {
"type": "boolean",
"title": "Include Compaction Messages",
"description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.",
"default": false
},
"streaming": {
"type": "boolean",
"title": "Streaming",
@@ -32400,7 +32406,7 @@
},
"message_type": {
"type": "string",
"const": "event",
"const": "event_message",
"title": "Message Type",
"default": "event_message"
},
@@ -36625,6 +36631,12 @@
"title": "Override Model",
"description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration."
},
"include_compaction_messages": {
"type": "boolean",
"title": "Include Compaction Messages",
"description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.",
"default": false
},
"callback_url": {
"anyOf": [
{
@@ -36811,6 +36823,12 @@
"title": "Override Model",
"description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration."
},
"include_compaction_messages": {
"type": "boolean",
"title": "Include Compaction Messages",
"description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.",
"default": false
},
"agent_id": {
"type": "string",
"maxLength": 42,
@@ -37176,6 +37194,12 @@
],
"title": "Override Model",
"description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration."
},
"include_compaction_messages": {
"type": "boolean",
"title": "Include Compaction Messages",
"description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.",
"default": false
}
},
"type": "object",
@@ -37418,6 +37442,12 @@
"title": "Override Model",
"description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration."
},
"include_compaction_messages": {
"type": "boolean",
"title": "Include Compaction Messages",
"description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.",
"default": false
},
"streaming": {
"type": "boolean",
"title": "Streaming",
@@ -38396,7 +38426,15 @@
},
"MessageRole": {
"type": "string",
"enum": ["assistant", "user", "tool", "function", "system", "approval"],
"enum": [
"assistant",
"user",
"tool",
"function",
"system",
"approval",
"summary"
],
"title": "MessageRole"
},
"MessageSearchRequest": {
@@ -38579,7 +38617,9 @@
"tool_call_message",
"tool_return_message",
"approval_request_message",
"approval_response_message"
"approval_response_message",
"summary_message",
"event_message"
],
"title": "MessageType"
},
@@ -42959,7 +42999,7 @@
},
"message_type": {
"type": "string",
"const": "summary",
"const": "summary_message",
"title": "Message Type",
"default": "summary_message"
},
@@ -48487,8 +48527,8 @@
"assistant_message": "#/components/schemas/AssistantMessage",
"approval_request_message": "#/components/schemas/ApprovalRequestMessage",
"approval_response_message": "#/components/schemas/ApprovalResponseMessage",
"summary": "#/components/schemas/SummaryMessage",
"event": "#/components/schemas/EventMessage"
"summary_message": "#/components/schemas/SummaryMessage",
"event_message": "#/components/schemas/EventMessage"
}
}
},

View File

@@ -46,6 +46,7 @@ class BaseAgentV2(ABC):
include_return_message_types: list[MessageType] | None = None,
request_start_timestamp_ns: int | None = None,
client_tools: list["ClientToolSchema"] | None = None,
include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility
) -> LettaResponse:
"""
Execute the agent loop in blocking mode, returning all messages at once.
@@ -53,6 +54,7 @@ class BaseAgentV2(ABC):
Args:
client_tools: Optional list of client-side tools. When called, execution pauses
for client to provide tool returns.
include_compaction_messages: Not used in V2, but accepted for API compatibility.
"""
raise NotImplementedError
@@ -66,8 +68,9 @@ class BaseAgentV2(ABC):
use_assistant_message: bool = True,
include_return_message_types: list[MessageType] | None = None,
request_start_timestamp_ns: int | None = None,
conversation_id: str | None = None,
conversation_id: str | None = None,
client_tools: list["ClientToolSchema"] | None = None,
include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility
) -> AsyncGenerator[LettaMessage | LegacyLettaMessage | MessageStreamStatus, None]:
"""
Execute the agent loop in streaming mode, yielding chunks as they become available.
@@ -78,5 +81,6 @@ class BaseAgentV2(ABC):
Args:
client_tools: Optional list of client-side tools. When called, execution pauses
for client to provide tool returns.
include_compaction_messages: Not used in V2, but accepted for API compatibility.
"""
raise NotImplementedError

View File

@@ -181,6 +181,7 @@ class LettaAgentV2(BaseAgentV2):
include_return_message_types: list[MessageType] | None = None,
request_start_timestamp_ns: int | None = None,
client_tools: list[ClientToolSchema] | None = None,
include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility
) -> LettaResponse:
"""
Execute the agent loop in blocking mode, returning all messages at once.
@@ -193,6 +194,7 @@ class LettaAgentV2(BaseAgentV2):
include_return_message_types: Filter for which message types to return
request_start_timestamp_ns: Start time for tracking request duration
client_tools: Optional list of client-side tools (not used in V2, for API compatibility)
include_compaction_messages: Not used in V2, but accepted for API compatibility.
Returns:
LettaResponse: Complete response with all messages and metadata
@@ -271,6 +273,7 @@ class LettaAgentV2(BaseAgentV2):
request_start_timestamp_ns: int | None = None,
conversation_id: str | None = None, # Not used in V2, but accepted for API compatibility
client_tools: list[ClientToolSchema] | None = None,
include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility
) -> AsyncGenerator[str, None]:
"""
Execute the agent loop in streaming mode, yielding chunks as they become available.
@@ -289,6 +292,7 @@ class LettaAgentV2(BaseAgentV2):
include_return_message_types: Filter for which message types to return
request_start_timestamp_ns: Start time for tracking request duration
client_tools: Optional list of client-side tools (not used in V2, for API compatibility)
include_compaction_messages: Not used in V2, but accepted for API compatibility.
Yields:
str: JSON-formatted SSE data chunks for each completed step

View File

@@ -29,7 +29,7 @@ from letta.local_llm.constants import INNER_THOUGHTS_KWARG
from letta.otel.tracing import trace_method
from letta.schemas.agent import AgentState
from letta.schemas.enums import MessageRole
from letta.schemas.letta_message import ApprovalReturn, LettaErrorMessage, LettaMessage, MessageType
from letta.schemas.letta_message import ApprovalReturn, EventMessage, LettaErrorMessage, LettaMessage, MessageType, SummaryMessage
from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent
from letta.schemas.letta_request import ClientToolSchema
from letta.schemas.letta_response import LettaResponse
@@ -120,6 +120,7 @@ class LettaAgentV3(LettaAgentV2):
request_start_timestamp_ns: int | None = None,
conversation_id: str | None = None,
client_tools: list[ClientToolSchema] | None = None,
include_compaction_messages: bool = False,
) -> LettaResponse:
"""
Execute the agent loop in blocking mode, returning all messages at once.
@@ -134,6 +135,8 @@ class LettaAgentV3(LettaAgentV2):
conversation_id: Optional conversation ID for conversation-scoped messaging
client_tools: Optional list of client-side tools. When called, execution pauses
for client to provide tool returns.
include_compaction_messages: Whether to include SummaryMessage/EventMessage in response
and use role=summary for stored summary messages.
Returns:
LettaResponse: Complete response with all messages and metadata
@@ -191,6 +194,7 @@ class LettaAgentV3(LettaAgentV2):
# use_assistant_message=use_assistant_message,
include_return_message_types=include_return_message_types,
request_start_timestamp_ns=request_start_timestamp_ns,
include_compaction_messages=include_compaction_messages,
)
input_messages_to_persist = [] # clear after first step
@@ -283,6 +287,7 @@ class LettaAgentV3(LettaAgentV2):
request_start_timestamp_ns: int | None = None,
conversation_id: str | None = None,
client_tools: list[ClientToolSchema] | None = None,
include_compaction_messages: bool = False,
) -> AsyncGenerator[str, None]:
"""
Execute the agent loop in streaming mode, yielding chunks as they become available.
@@ -372,6 +377,7 @@ class LettaAgentV3(LettaAgentV2):
# use_assistant_message=use_assistant_message,
include_return_message_types=include_return_message_types,
request_start_timestamp_ns=request_start_timestamp_ns,
include_compaction_messages=include_compaction_messages,
)
input_messages_to_persist = [] # clear after first step
async for chunk in response:
@@ -556,6 +562,73 @@ class LettaAgentV3(LettaAgentV2):
self.in_context_messages = in_context_messages # update in-memory state
def _create_compaction_event_message(
self,
step_id: str | None,
run_id: str | None,
trigger: str,
) -> EventMessage:
"""
Create an EventMessage to notify the client that compaction is starting.
Args:
step_id: The current step ID
run_id: The current run ID
trigger: The trigger that caused compaction (e.g., "context_window_exceeded", "post_step_context_check")
Returns:
EventMessage to yield before compaction starts
"""
return EventMessage(
id=str(uuid.uuid4()),
date=get_utc_time(),
event_type="compaction",
event_data={
"trigger": trigger,
"context_token_estimate": self.context_token_estimate,
"context_window": self.agent_state.llm_config.context_window,
},
run_id=run_id,
step_id=step_id,
)
def _create_summary_result_message(
self,
summary_message: Message,
summary_text: str,
step_id: str | None,
run_id: str | None,
include_compaction_messages: bool,
) -> list[LettaMessage]:
"""
Create the summary message to yield to the client after compaction completes.
Args:
summary_message: The persisted summary Message object
summary_text: The raw summary text (unpacked)
step_id: The current step ID
run_id: The current run ID
include_compaction_messages: If True, return SummaryMessage; if False, return UserMessage
Returns:
List of LettaMessage objects to yield to the client
"""
if include_compaction_messages:
# New behavior: structured SummaryMessage
return [
SummaryMessage(
id=summary_message.id,
date=summary_message.created_at,
summary=summary_text,
otid=Message.generate_otid_from_id(summary_message.id, 0),
step_id=step_id,
run_id=run_id,
),
]
else:
# Old behavior: UserMessage with packed JSON
return list(Message.to_letta_messages(summary_message))
@trace_method
async def _step(
self,
@@ -569,6 +642,7 @@ class LettaAgentV3(LettaAgentV2):
remaining_turns: int = -1,
dry_run: bool = False,
enforce_run_id_set: bool = True,
include_compaction_messages: bool = False,
) -> AsyncGenerator[LettaMessage | dict, None]:
"""
Execute a single agent step (one LLM call and tool execution).
@@ -790,16 +864,43 @@ class LettaAgentV3(LettaAgentV2):
self.logger.info(
f"Context window exceeded (error {e}), trying to compact messages attempt {llm_request_attempt + 1} of {summarizer_settings.max_summarizer_retries + 1}"
)
# checkpoint summarized messages
# TODO: might want to delay this checkpoint in case of corrupated state
try:
summary_message, messages, _ = await self.compact(
# Yield event notification before compaction starts
if include_compaction_messages:
yield self._create_compaction_event_message(
step_id=step_id,
run_id=run_id,
trigger="context_window_exceeded",
)
summary_message, messages, summary_text = await self.compact(
messages,
trigger_threshold=self.agent_state.llm_config.context_window,
run_id=run_id,
step_id=step_id,
use_summary_role=include_compaction_messages,
)
self.logger.info("Summarization succeeded, continuing to retry LLM request")
# Persist the summary message
self.response_messages.append(summary_message)
await self._checkpoint_messages(
run_id=run_id,
step_id=step_id,
new_messages=[summary_message],
in_context_messages=messages,
)
# Yield summary result message to client
for msg in self._create_summary_result_message(
summary_message=summary_message,
summary_text=summary_text,
step_id=step_id,
run_id=run_id,
include_compaction_messages=include_compaction_messages,
):
yield msg
continue
except SystemPromptTokenExceededError:
self.stop_reason = LettaStopReason(
@@ -811,14 +912,6 @@ class LettaAgentV3(LettaAgentV2):
self.logger.error(f"Unknown error occured for summarization run {run_id}: {e}")
raise e
# update the messages
await self._checkpoint_messages(
run_id=run_id,
step_id=step_id,
new_messages=[summary_message],
in_context_messages=messages,
)
else:
self.stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value)
self.logger.error(f"Unknown error occured for run {run_id}: {e}")
@@ -919,17 +1012,34 @@ class LettaAgentV3(LettaAgentV2):
self.logger.info(
f"Context window exceeded (current: {self.context_token_estimate}, threshold: {self.agent_state.llm_config.context_window}), trying to compact messages"
)
summary_message, messages, _ = await self.compact(
# Yield event notification before compaction starts
if include_compaction_messages:
yield self._create_compaction_event_message(
step_id=step_id,
run_id=run_id,
trigger="post_step_context_check",
)
summary_message, messages, summary_text = await self.compact(
messages,
trigger_threshold=self.agent_state.llm_config.context_window,
run_id=run_id,
step_id=step_id,
use_summary_role=include_compaction_messages,
)
# TODO: persist + return the summary message
# TODO: convert this to a SummaryMessage
self.response_messages.append(summary_message)
for message in Message.to_letta_messages(summary_message):
yield message
# Yield summary result message to client
for msg in self._create_summary_result_message(
summary_message=summary_message,
summary_text=summary_text,
step_id=step_id,
run_id=run_id,
include_compaction_messages=include_compaction_messages,
):
yield msg
await self._checkpoint_messages(
run_id=run_id,
step_id=step_id,
@@ -1501,6 +1611,7 @@ class LettaAgentV3(LettaAgentV2):
compaction_settings: Optional["CompactionSettings"] = None,
run_id: Optional[str] = None,
step_id: Optional[str] = None,
use_summary_role: bool = False,
) -> tuple[Message, list[Message], str]:
"""Compact the current in-context messages for this agent.
@@ -1508,6 +1619,11 @@ class LettaAgentV3(LettaAgentV2):
``compaction_settings.model`` when provided. This mirrors how agent
creation derives defaults from provider-specific ModelSettings, but is
localized to summarization.
Args:
use_summary_role: If True, the summary message will be created with
role=summary instead of role=user. This enables first-class
summary message handling in the database and API responses.
"""
# Use the passed-in compaction_settings first, then agent's compaction_settings if set,
@@ -1630,23 +1746,36 @@ class LettaAgentV3(LettaAgentV2):
summary=summary,
timezone=self.agent_state.timezone,
)
summary_messages = await convert_message_creates_to_messages(
message_creates=[
MessageCreate(
role=MessageRole.user,
content=[TextContent(text=summary_message_str_packed)],
)
],
agent_id=self.agent_state.id,
timezone=self.agent_state.timezone,
# We already packed, don't pack again
wrap_user_message=False,
wrap_system_message=False,
run_id=None, # TODO: add this
)
if not len(summary_messages) == 1:
self.logger.error(f"Expected only one summary message, got {len(summary_messages)} in {summary_messages}")
summary_message_obj = summary_messages[0]
if use_summary_role:
# New behavior: Create Message directly with role=summary
# (bypassing MessageCreate which only accepts user/system/assistant roles)
summary_message_obj = Message(
role=MessageRole.summary,
content=[TextContent(text=summary_message_str_packed)],
agent_id=self.agent_state.id,
run_id=run_id,
step_id=step_id,
)
else:
# Legacy behavior: Use convert_message_creates_to_messages with role=user
summary_messages = await convert_message_creates_to_messages(
message_creates=[
MessageCreate(
role=MessageRole.user,
content=[TextContent(text=summary_message_str_packed)],
)
],
agent_id=self.agent_state.id,
timezone=self.agent_state.timezone,
# We already packed, don't pack again
wrap_user_message=False,
wrap_system_message=False,
run_id=run_id,
)
if not len(summary_messages) == 1:
self.logger.error(f"Expected only one summary message, got {len(summary_messages)} in {summary_messages}")
summary_message_obj = summary_messages[0]
# final messages: inject summarization message at the beginning
final_messages = [compacted_messages[0]] + [summary_message_obj]

View File

@@ -47,6 +47,7 @@ class SleeptimeMultiAgentV3(LettaAgentV2):
include_return_message_types: list[MessageType] | None = None,
request_start_timestamp_ns: int | None = None,
client_tools: list[ClientToolSchema] | None = None,
include_compaction_messages: bool = False,
) -> LettaResponse:
self.run_ids = []
@@ -61,6 +62,7 @@ class SleeptimeMultiAgentV3(LettaAgentV2):
include_return_message_types=include_return_message_types,
request_start_timestamp_ns=request_start_timestamp_ns,
client_tools=client_tools,
include_compaction_messages=include_compaction_messages,
)
await self.run_sleeptime_agents()
@@ -79,6 +81,7 @@ class SleeptimeMultiAgentV3(LettaAgentV2):
request_start_timestamp_ns: int | None = None,
include_return_message_types: list[MessageType] | None = None,
client_tools: list[ClientToolSchema] | None = None,
include_compaction_messages: bool = False,
) -> AsyncGenerator[str, None]:
self.run_ids = []
@@ -96,6 +99,7 @@ class SleeptimeMultiAgentV3(LettaAgentV2):
include_return_message_types=include_return_message_types,
request_start_timestamp_ns=request_start_timestamp_ns,
client_tools=client_tools,
include_compaction_messages=include_compaction_messages,
):
yield chunk
finally:

View File

@@ -48,6 +48,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3):
request_start_timestamp_ns: int | None = None,
conversation_id: str | None = None,
client_tools: list[ClientToolSchema] | None = None,
include_compaction_messages: bool = False,
) -> LettaResponse:
self.run_ids = []
@@ -63,6 +64,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3):
request_start_timestamp_ns=request_start_timestamp_ns,
conversation_id=conversation_id,
client_tools=client_tools,
include_compaction_messages=include_compaction_messages,
)
run_ids = await self.run_sleeptime_agents()
@@ -81,6 +83,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3):
include_return_message_types: list[MessageType] | None = None,
conversation_id: str | None = None,
client_tools: list[ClientToolSchema] | None = None,
include_compaction_messages: bool = False,
) -> AsyncGenerator[str, None]:
self.run_ids = []
@@ -99,6 +102,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3):
request_start_timestamp_ns=request_start_timestamp_ns,
conversation_id=conversation_id,
client_tools=client_tools,
include_compaction_messages=include_compaction_messages,
):
yield chunk
finally:

View File

@@ -103,6 +103,7 @@ class MessageRole(str, Enum):
function = "function"
system = "system"
approval = "approval"
summary = "summary"
class MessageSourceType(str, Enum):

View File

@@ -61,6 +61,8 @@ class MessageType(str, Enum):
tool_return_message = "tool_return_message"
approval_request_message = "approval_request_message"
approval_response_message = "approval_response_message"
summary_message = "summary_message"
event_message = "event_message"
class LettaMessage(BaseModel):
@@ -399,7 +401,7 @@ class SummaryMessage(LettaMessage):
A message representing a summary of the conversation. Sent to the LLM as a user or system message depending on the provider.
"""
message_type: Literal["summary"] = "summary_message"
message_type: Literal["summary_message"] = "summary_message"
summary: str
@@ -408,7 +410,7 @@ class EventMessage(LettaMessage):
A message for notifying the developer that an event that has occured (e.g. a compaction). Events are NOT part of the context window.
"""
message_type: Literal["event"] = "event_message"
message_type: Literal["event_message"] = "event_message"
event_type: Literal["compaction"]
event_data: dict
@@ -459,8 +461,8 @@ def create_letta_message_union_schema():
"assistant_message": "#/components/schemas/AssistantMessage",
"approval_request_message": "#/components/schemas/ApprovalRequestMessage",
"approval_response_message": "#/components/schemas/ApprovalResponseMessage",
"summary": "#/components/schemas/SummaryMessage",
"event": "#/components/schemas/EventMessage",
"summary_message": "#/components/schemas/SummaryMessage",
"event_message": "#/components/schemas/EventMessage",
},
},
}

View File

@@ -73,6 +73,13 @@ class LettaRequest(BaseModel):
"This allows sending a message to a different model without changing the agent's configuration.",
)
# Compaction message format
include_compaction_messages: bool = Field(
default=False,
description="If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. "
"If False (default), compaction messages are not included in the response.",
)
@field_validator("messages", mode="before")
@classmethod
def add_default_type_to_messages(cls, v):

View File

@@ -37,6 +37,7 @@ from letta.schemas.letta_message import (
MessageType,
ReasoningMessage,
ReasoningMessageListResult,
SummaryMessage,
SystemMessage,
SystemMessageListResult,
ToolCall,
@@ -290,7 +291,7 @@ class Message(BaseMessage):
@field_validator("role")
@classmethod
def validate_role(cls, v: str) -> str:
roles = ["system", "assistant", "user", "tool", "approval"]
roles = ["system", "assistant", "user", "tool", "approval", "summary"]
assert v in roles, f"Role must be one of {roles}"
return v
@@ -320,6 +321,7 @@ class Message(BaseMessage):
reverse: bool = True,
include_err: Optional[bool] = None,
text_is_assistant_message: bool = False,
convert_summary_to_user: bool = True,
) -> List[LettaMessage]:
if use_assistant_message:
message_ids_to_remove = []
@@ -352,6 +354,7 @@ class Message(BaseMessage):
reverse=reverse,
include_err=include_err,
text_is_assistant_message=text_is_assistant_message,
convert_summary_to_user=convert_summary_to_user,
)
]
@@ -365,6 +368,7 @@ class Message(BaseMessage):
reverse: bool = True,
include_err: Optional[bool] = None,
text_is_assistant_message: bool = False,
convert_summary_to_user: bool = True,
) -> List[LettaMessageSearchResult]:
"""Convert MessageSearchResult objects into LettaMessageSearchResult objects.
@@ -385,6 +389,7 @@ class Message(BaseMessage):
reverse=reverse,
include_err=include_err,
text_is_assistant_message=text_is_assistant_message,
convert_summary_to_user=convert_summary_to_user,
)
for lm in letta_messages:
@@ -445,8 +450,14 @@ class Message(BaseMessage):
reverse: bool = True,
include_err: Optional[bool] = None,
text_is_assistant_message: bool = False,
convert_summary_to_user: bool = True,
) -> List[LettaMessage]:
"""Convert message object (in DB format) to the style used by the original Letta API"""
"""Convert message object (in DB format) to the style used by the original Letta API
Args:
convert_summary_to_user: If True (default), summary messages are returned as UserMessage
for backward compatibility. If False, return as SummaryMessage.
"""
messages = []
if self.role == MessageRole.assistant:
@@ -468,6 +479,8 @@ class Message(BaseMessage):
messages.append(self._convert_user_message())
elif self.role == MessageRole.system:
messages.append(self._convert_system_message())
elif self.role == MessageRole.summary:
messages.append(self._convert_summary_message(as_user_message=convert_summary_to_user))
elif self.role == MessageRole.approval:
if self.content:
messages.extend(self._convert_reasoning_messages(text_is_assistant_message=text_is_assistant_message))
@@ -1036,6 +1049,45 @@ class Message(BaseMessage):
run_id=self.run_id,
)
def _convert_summary_message(self, as_user_message: bool = True) -> Union[SummaryMessage, UserMessage]:
"""Convert summary role message to SummaryMessage or UserMessage.
Args:
as_user_message: If True, return UserMessage for backward compatibility with
clients that don't support SummaryMessage. If False, return SummaryMessage.
"""
if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent):
text_content = self.content[0].text
else:
raise ValueError(f"Invalid summary message (no text object on message): {self.content}")
# Unpack the summary from the packed JSON format
# The packed format is: {"type": "system_alert", "message": "...", "time": "..."}
summary = unpack_message(text_content)
if as_user_message:
# Return as UserMessage for backward compatibility
return UserMessage(
id=self.id,
date=self.created_at,
content=summary,
name=self.name,
otid=self.otid,
sender_id=self.sender_id,
step_id=self.step_id,
is_err=self.is_err,
run_id=self.run_id,
)
else:
return SummaryMessage(
id=self.id,
date=self.created_at,
summary=summary,
otid=self.otid,
step_id=self.step_id,
run_id=self.run_id,
)
@staticmethod
def dict_to_message(
agent_id: str,
@@ -1297,6 +1349,14 @@ class Message(BaseMessage):
"role": self.role,
}
elif self.role == "summary":
# Summary messages are converted to user messages (same as current system_alert behavior)
assert text_content is not None, vars(self)
openai_message = {
"content": text_content,
"role": "user",
}
elif self.role == "assistant" or self.role == "approval":
try:
assert self.tool_calls is not None or text_content is not None, vars(self)
@@ -1474,6 +1534,16 @@ class Message(BaseMessage):
message_dicts.append(user_dict)
elif self.role == "summary":
# Summary messages are converted to user messages (same as current system_alert behavior)
assert self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent), vars(self)
message_dicts.append(
{
"role": "user",
"content": self.content[0].text,
}
)
elif self.role == "assistant" or self.role == "approval":
# Validate that message has content OpenAI Responses API can process
if self.tool_calls is None and (self.content is None or len(self.content) == 0):
@@ -1793,6 +1863,14 @@ class Message(BaseMessage):
"role": self.role,
}
elif self.role == "summary":
# Summary messages are converted to user messages (same as current system_alert behavior)
assert text_content is not None, vars(self)
anthropic_message = {
"content": text_content,
"role": "user",
}
elif self.role == "assistant" or self.role == "approval":
# Validate that message has content Anthropic API can process
if self.tool_calls is None and (self.content is None or len(self.content) == 0):
@@ -2053,6 +2131,14 @@ class Message(BaseMessage):
"parts": content_parts,
}
elif self.role == "summary":
# Summary messages are converted to user messages (same as current system_alert behavior)
assert text_content is not None, vars(self)
google_ai_message = {
"role": "user",
"parts": [{"text": text_content}],
}
elif self.role == "assistant" or self.role == "approval":
# Validate that message has content Google API can process
if self.tool_calls is None and text_content is None and len(self.content) <= 1:
@@ -2290,6 +2376,10 @@ class Message(BaseMessage):
return self.role == "approval" and self.tool_calls is None and self.approve is not None
def is_summarization_message(self) -> bool:
# First-class summary role (new format)
if self.role == "summary":
return True
# Legacy format: user message with system_alert content
return (
self.role == "user"
and self.content is not None

View File

@@ -1608,6 +1608,7 @@ async def send_message(
request_start_timestamp_ns=request_start_timestamp_ns,
include_return_message_types=request.include_return_message_types,
client_tools=request.client_tools,
include_compaction_messages=request.include_compaction_messages,
)
else:
result = await server.send_message_to_agent(
@@ -1820,6 +1821,7 @@ async def _process_message_background(
max_steps: int = DEFAULT_MAX_STEPS,
include_return_message_types: list[MessageType] | None = None,
override_model: str | None = None,
include_compaction_messages: bool = False,
) -> None:
"""Background task to process the message and update run status."""
request_start_timestamp_ns = get_utc_timestamp_ns()
@@ -1866,6 +1868,7 @@ async def _process_message_background(
use_assistant_message=use_assistant_message,
request_start_timestamp_ns=request_start_timestamp_ns,
include_return_message_types=include_return_message_types,
include_compaction_messages=include_compaction_messages,
)
else:
result = await server.send_message_to_agent(
@@ -2048,6 +2051,7 @@ async def send_message_async(
max_steps=request.max_steps,
include_return_message_types=request.include_return_message_types,
override_model=request.override_model,
include_compaction_messages=request.include_compaction_messages,
),
label=f"process_message_background_{run.id}",
)

View File

@@ -265,6 +265,7 @@ async def send_conversation_message(
include_return_message_types=request.include_return_message_types,
client_tools=request.client_tools,
conversation_id=conversation_id,
include_compaction_messages=request.include_compaction_messages,
)

View File

@@ -150,6 +150,7 @@ class StreamingService:
actor=actor,
conversation_id=conversation_id,
client_tools=request.client_tools,
include_compaction_messages=request.include_compaction_messages,
)
# handle background streaming if requested
@@ -326,6 +327,7 @@ class StreamingService:
actor: User,
conversation_id: Optional[str] = None,
client_tools: Optional[list[ClientToolSchema]] = None,
include_compaction_messages: bool = False,
) -> AsyncIterator:
"""
Create a stream with unified error handling.
@@ -354,6 +356,7 @@ class StreamingService:
include_return_message_types=include_return_message_types,
conversation_id=conversation_id,
client_tools=client_tools,
include_compaction_messages=include_compaction_messages,
)
async for chunk in stream:

View File

@@ -20,7 +20,7 @@ from letta.schemas.agent import CreateAgent, UpdateAgent
from letta.schemas.block import BlockUpdate, CreateBlock
from letta.schemas.embedding_config import EmbeddingConfig
from letta.schemas.enums import MessageRole
from letta.schemas.letta_message import LettaMessage
from letta.schemas.letta_message import EventMessage, LettaMessage, SummaryMessage
from letta.schemas.letta_message_content import TextContent, ToolCallContent, ToolReturnContent
from letta.schemas.llm_config import LLMConfig
from letta.schemas.message import Message as PydanticMessage, MessageCreate
@@ -725,7 +725,7 @@ async def test_summarize_with_mode(server: SyncServer, actor, llm_config: LLMCon
agent_loop = LettaAgentV3(agent_state=agent_state, actor=actor)
summary, result, _ = await agent_loop.compact(messages=in_context_messages)
summary, result, summary_text = await agent_loop.compact(messages=in_context_messages)
assert isinstance(result, list)
@@ -734,10 +734,16 @@ async def test_summarize_with_mode(server: SyncServer, actor, llm_config: LLMCon
assert hasattr(msg, "role")
assert hasattr(msg, "content")
# Verify the summary text (third return value) is a non-empty string.
# This is used by the agent loop to construct a SummaryMessage for clients.
assert isinstance(summary_text, str), f"Expected summary_text to be a string, got {type(summary_text)}"
assert len(summary_text) > 0, "Expected non-empty summary text"
print()
print(f"RESULTS {mode} ======")
for msg in result:
print(f"MSG: {msg}")
print(f"SUMMARY TEXT: {summary_text[:200]}...")
print()
@@ -759,6 +765,87 @@ async def test_summarize_with_mode(server: SyncServer, actor, llm_config: LLMCon
assert result[1].role == MessageRole.user
@pytest.mark.asyncio
@pytest.mark.parametrize(
"llm_config",
TESTED_LLM_CONFIGS,
ids=[c.model for c in TESTED_LLM_CONFIGS],
)
async def test_compact_returns_valid_summary_message_and_event_message(server: SyncServer, actor, llm_config: LLMConfig):
"""
Test that compact() return values can be used to construct valid SummaryMessage and EventMessage objects.
This validates the contract that _step() relies on: compact() returns
(summary_message_obj, compacted_messages, summary_text) where summary_text
is used to build a SummaryMessage and the metadata is used for an EventMessage.
"""
import uuid
from letta.helpers.datetime_helpers import get_utc_time
# Create a conversation with enough messages to summarize
messages = [
PydanticMessage(
role=MessageRole.system,
content=[TextContent(type="text", text="You are a helpful assistant.")],
)
]
for i in range(10):
messages.append(
PydanticMessage(
role=MessageRole.user,
content=[TextContent(type="text", text=f"User message {i}: Test message {i}.")],
)
)
messages.append(
PydanticMessage(
role=MessageRole.assistant,
content=[TextContent(type="text", text=f"Assistant response {i}: Acknowledged message {i}.")],
)
)
agent_state, in_context_messages = await create_agent_with_messages(server, actor, llm_config, messages)
handle = llm_config.handle or f"{llm_config.model_endpoint_type}/{llm_config.model}"
agent_state.compaction_settings = CompactionSettings(model=handle, mode="all")
agent_loop = LettaAgentV3(agent_state=agent_state, actor=actor)
summary_message_obj, compacted_messages, summary_text = await agent_loop.compact(messages=in_context_messages)
# Verify we can construct a valid SummaryMessage from compact() return values
summary_msg = SummaryMessage(
id=summary_message_obj.id,
date=summary_message_obj.created_at,
summary=summary_text,
otid=PydanticMessage.generate_otid_from_id(summary_message_obj.id, 0),
step_id=None,
run_id=None,
)
assert summary_msg.message_type == "summary_message"
assert isinstance(summary_msg.summary, str)
assert len(summary_msg.summary) > 0
assert summary_msg.id == summary_message_obj.id
# Verify we can construct a valid EventMessage for compaction
event_msg = EventMessage(
id=str(uuid.uuid4()),
date=get_utc_time(),
event_type="compaction",
event_data={
"trigger": "post_step_context_check",
"context_token_estimate": 1000,
"context_window": agent_state.llm_config.context_window,
},
run_id=None,
step_id=None,
)
assert event_msg.message_type == "event_message"
assert event_msg.event_type == "compaction"
assert "trigger" in event_msg.event_data
assert "context_window" in event_msg.event_data
@pytest.mark.asyncio
async def test_v3_compact_uses_compaction_settings_model_and_model_settings(server: SyncServer, actor):
"""Integration test: LettaAgentV3.compact uses the LLMConfig implied by CompactionSettings.
@@ -911,7 +998,7 @@ async def test_v3_summarize_hard_eviction_when_still_over_threshold(
caplog.set_level("ERROR")
summary, result, _ = await agent_loop.compact(
summary, result, summary_text = await agent_loop.compact(
messages=in_context_messages,
trigger_threshold=context_limit,
)
@@ -932,6 +1019,10 @@ async def test_v3_summarize_hard_eviction_when_still_over_threshold(
assert result[0].role == MessageRole.system
assert result[1].role == MessageRole.user
# Verify the summary text is returned (used to construct SummaryMessage in the agent loop)
assert isinstance(summary_text, str), f"Expected summary_text to be a string, got {type(summary_text)}"
assert len(summary_text) > 0, "Expected non-empty summary text after hard eviction"
# ======================================================================================================================
# Sliding Window Summarizer Unit Tests