From d28ccc0be6d4a24806077462e2468397c6ad5ed6 Mon Sep 17 00:00:00 2001 From: jnjpng Date: Fri, 30 Jan 2026 11:59:24 -0800 Subject: [PATCH] feat: add summary message and event on compaction (#9144) * base * update * update * revert formatting * routes * legacy * fix * review * update --- fern/openapi.json | 64 ++++-- letta/agents/base_agent_v2.py | 6 +- letta/agents/letta_agent_v2.py | 4 + letta/agents/letta_agent_v3.py | 197 +++++++++++++++--- letta/groups/sleeptime_multi_agent_v3.py | 4 + letta/groups/sleeptime_multi_agent_v4.py | 4 + letta/schemas/enums.py | 1 + letta/schemas/letta_message.py | 10 +- letta/schemas/letta_request.py | 7 + letta/schemas/message.py | 94 ++++++++- letta/server/rest_api/routers/v1/agents.py | 4 + .../rest_api/routers/v1/conversations.py | 1 + letta/services/streaming_service.py | 3 + tests/integration_test_summarizer.py | 97 ++++++++- 14 files changed, 440 insertions(+), 56 deletions(-) diff --git a/fern/openapi.json b/fern/openapi.json index 3f69854f..d86cf7ab 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -7806,8 +7806,8 @@ "assistant_message": "#/components/schemas/AssistantMessage", "approval_request_message": "#/components/schemas/ApprovalRequestMessage", "approval_response_message": "#/components/schemas/ApprovalResponseMessage", - "summary": "#/components/schemas/SummaryMessage", - "event": "#/components/schemas/EventMessage" + "summary_message": "#/components/schemas/SummaryMessage", + "event_message": "#/components/schemas/EventMessage" } }, "title": "Response Modify Message" @@ -9965,8 +9965,8 @@ "assistant_message": "#/components/schemas/AssistantMessage", "approval_request_message": "#/components/schemas/ApprovalRequestMessage", "approval_response_message": "#/components/schemas/ApprovalResponseMessage", - "summary": "#/components/schemas/SummaryMessage", - "event": "#/components/schemas/EventMessage" + "summary_message": "#/components/schemas/SummaryMessage", + "event_message": "#/components/schemas/EventMessage" } }, "title": "Response Modify Group Message" @@ -16537,8 +16537,8 @@ "assistant_message": "#/components/schemas/AssistantMessage", "approval_request_message": "#/components/schemas/ApprovalRequestMessage", "approval_response_message": "#/components/schemas/ApprovalResponseMessage", - "summary": "#/components/schemas/SummaryMessage", - "event": "#/components/schemas/EventMessage" + "summary_message": "#/components/schemas/SummaryMessage", + "event_message": "#/components/schemas/EventMessage" } } }, @@ -30454,6 +30454,12 @@ "title": "Override Model", "description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration." }, + "include_compaction_messages": { + "type": "boolean", + "title": "Include Compaction Messages", + "description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.", + "default": false + }, "streaming": { "type": "boolean", "title": "Streaming", @@ -32400,7 +32406,7 @@ }, "message_type": { "type": "string", - "const": "event", + "const": "event_message", "title": "Message Type", "default": "event_message" }, @@ -36625,6 +36631,12 @@ "title": "Override Model", "description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration." }, + "include_compaction_messages": { + "type": "boolean", + "title": "Include Compaction Messages", + "description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.", + "default": false + }, "callback_url": { "anyOf": [ { @@ -36811,6 +36823,12 @@ "title": "Override Model", "description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration." }, + "include_compaction_messages": { + "type": "boolean", + "title": "Include Compaction Messages", + "description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.", + "default": false + }, "agent_id": { "type": "string", "maxLength": 42, @@ -37176,6 +37194,12 @@ ], "title": "Override Model", "description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration." + }, + "include_compaction_messages": { + "type": "boolean", + "title": "Include Compaction Messages", + "description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.", + "default": false } }, "type": "object", @@ -37418,6 +37442,12 @@ "title": "Override Model", "description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration." }, + "include_compaction_messages": { + "type": "boolean", + "title": "Include Compaction Messages", + "description": "If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. If False (default), compaction messages are not included in the response.", + "default": false + }, "streaming": { "type": "boolean", "title": "Streaming", @@ -38396,7 +38426,15 @@ }, "MessageRole": { "type": "string", - "enum": ["assistant", "user", "tool", "function", "system", "approval"], + "enum": [ + "assistant", + "user", + "tool", + "function", + "system", + "approval", + "summary" + ], "title": "MessageRole" }, "MessageSearchRequest": { @@ -38579,7 +38617,9 @@ "tool_call_message", "tool_return_message", "approval_request_message", - "approval_response_message" + "approval_response_message", + "summary_message", + "event_message" ], "title": "MessageType" }, @@ -42959,7 +42999,7 @@ }, "message_type": { "type": "string", - "const": "summary", + "const": "summary_message", "title": "Message Type", "default": "summary_message" }, @@ -48487,8 +48527,8 @@ "assistant_message": "#/components/schemas/AssistantMessage", "approval_request_message": "#/components/schemas/ApprovalRequestMessage", "approval_response_message": "#/components/schemas/ApprovalResponseMessage", - "summary": "#/components/schemas/SummaryMessage", - "event": "#/components/schemas/EventMessage" + "summary_message": "#/components/schemas/SummaryMessage", + "event_message": "#/components/schemas/EventMessage" } } }, diff --git a/letta/agents/base_agent_v2.py b/letta/agents/base_agent_v2.py index d66f6f04..df45b176 100644 --- a/letta/agents/base_agent_v2.py +++ b/letta/agents/base_agent_v2.py @@ -46,6 +46,7 @@ class BaseAgentV2(ABC): include_return_message_types: list[MessageType] | None = None, request_start_timestamp_ns: int | None = None, client_tools: list["ClientToolSchema"] | None = None, + include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility ) -> LettaResponse: """ Execute the agent loop in blocking mode, returning all messages at once. @@ -53,6 +54,7 @@ class BaseAgentV2(ABC): Args: client_tools: Optional list of client-side tools. When called, execution pauses for client to provide tool returns. + include_compaction_messages: Not used in V2, but accepted for API compatibility. """ raise NotImplementedError @@ -66,8 +68,9 @@ class BaseAgentV2(ABC): use_assistant_message: bool = True, include_return_message_types: list[MessageType] | None = None, request_start_timestamp_ns: int | None = None, - conversation_id: str | None = None, + conversation_id: str | None = None, client_tools: list["ClientToolSchema"] | None = None, + include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility ) -> AsyncGenerator[LettaMessage | LegacyLettaMessage | MessageStreamStatus, None]: """ Execute the agent loop in streaming mode, yielding chunks as they become available. @@ -78,5 +81,6 @@ class BaseAgentV2(ABC): Args: client_tools: Optional list of client-side tools. When called, execution pauses for client to provide tool returns. + include_compaction_messages: Not used in V2, but accepted for API compatibility. """ raise NotImplementedError diff --git a/letta/agents/letta_agent_v2.py b/letta/agents/letta_agent_v2.py index 58379c78..efa42849 100644 --- a/letta/agents/letta_agent_v2.py +++ b/letta/agents/letta_agent_v2.py @@ -181,6 +181,7 @@ class LettaAgentV2(BaseAgentV2): include_return_message_types: list[MessageType] | None = None, request_start_timestamp_ns: int | None = None, client_tools: list[ClientToolSchema] | None = None, + include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility ) -> LettaResponse: """ Execute the agent loop in blocking mode, returning all messages at once. @@ -193,6 +194,7 @@ class LettaAgentV2(BaseAgentV2): include_return_message_types: Filter for which message types to return request_start_timestamp_ns: Start time for tracking request duration client_tools: Optional list of client-side tools (not used in V2, for API compatibility) + include_compaction_messages: Not used in V2, but accepted for API compatibility. Returns: LettaResponse: Complete response with all messages and metadata @@ -271,6 +273,7 @@ class LettaAgentV2(BaseAgentV2): request_start_timestamp_ns: int | None = None, conversation_id: str | None = None, # Not used in V2, but accepted for API compatibility client_tools: list[ClientToolSchema] | None = None, + include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility ) -> AsyncGenerator[str, None]: """ Execute the agent loop in streaming mode, yielding chunks as they become available. @@ -289,6 +292,7 @@ class LettaAgentV2(BaseAgentV2): include_return_message_types: Filter for which message types to return request_start_timestamp_ns: Start time for tracking request duration client_tools: Optional list of client-side tools (not used in V2, for API compatibility) + include_compaction_messages: Not used in V2, but accepted for API compatibility. Yields: str: JSON-formatted SSE data chunks for each completed step diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index 0a4ea5c2..0456ae56 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -29,7 +29,7 @@ from letta.local_llm.constants import INNER_THOUGHTS_KWARG from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.enums import MessageRole -from letta.schemas.letta_message import ApprovalReturn, LettaErrorMessage, LettaMessage, MessageType +from letta.schemas.letta_message import ApprovalReturn, EventMessage, LettaErrorMessage, LettaMessage, MessageType, SummaryMessage from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent from letta.schemas.letta_request import ClientToolSchema from letta.schemas.letta_response import LettaResponse @@ -120,6 +120,7 @@ class LettaAgentV3(LettaAgentV2): request_start_timestamp_ns: int | None = None, conversation_id: str | None = None, client_tools: list[ClientToolSchema] | None = None, + include_compaction_messages: bool = False, ) -> LettaResponse: """ Execute the agent loop in blocking mode, returning all messages at once. @@ -134,6 +135,8 @@ class LettaAgentV3(LettaAgentV2): conversation_id: Optional conversation ID for conversation-scoped messaging client_tools: Optional list of client-side tools. When called, execution pauses for client to provide tool returns. + include_compaction_messages: Whether to include SummaryMessage/EventMessage in response + and use role=summary for stored summary messages. Returns: LettaResponse: Complete response with all messages and metadata @@ -191,6 +194,7 @@ class LettaAgentV3(LettaAgentV2): # use_assistant_message=use_assistant_message, include_return_message_types=include_return_message_types, request_start_timestamp_ns=request_start_timestamp_ns, + include_compaction_messages=include_compaction_messages, ) input_messages_to_persist = [] # clear after first step @@ -283,6 +287,7 @@ class LettaAgentV3(LettaAgentV2): request_start_timestamp_ns: int | None = None, conversation_id: str | None = None, client_tools: list[ClientToolSchema] | None = None, + include_compaction_messages: bool = False, ) -> AsyncGenerator[str, None]: """ Execute the agent loop in streaming mode, yielding chunks as they become available. @@ -372,6 +377,7 @@ class LettaAgentV3(LettaAgentV2): # use_assistant_message=use_assistant_message, include_return_message_types=include_return_message_types, request_start_timestamp_ns=request_start_timestamp_ns, + include_compaction_messages=include_compaction_messages, ) input_messages_to_persist = [] # clear after first step async for chunk in response: @@ -556,6 +562,73 @@ class LettaAgentV3(LettaAgentV2): self.in_context_messages = in_context_messages # update in-memory state + def _create_compaction_event_message( + self, + step_id: str | None, + run_id: str | None, + trigger: str, + ) -> EventMessage: + """ + Create an EventMessage to notify the client that compaction is starting. + + Args: + step_id: The current step ID + run_id: The current run ID + trigger: The trigger that caused compaction (e.g., "context_window_exceeded", "post_step_context_check") + + Returns: + EventMessage to yield before compaction starts + """ + return EventMessage( + id=str(uuid.uuid4()), + date=get_utc_time(), + event_type="compaction", + event_data={ + "trigger": trigger, + "context_token_estimate": self.context_token_estimate, + "context_window": self.agent_state.llm_config.context_window, + }, + run_id=run_id, + step_id=step_id, + ) + + def _create_summary_result_message( + self, + summary_message: Message, + summary_text: str, + step_id: str | None, + run_id: str | None, + include_compaction_messages: bool, + ) -> list[LettaMessage]: + """ + Create the summary message to yield to the client after compaction completes. + + Args: + summary_message: The persisted summary Message object + summary_text: The raw summary text (unpacked) + step_id: The current step ID + run_id: The current run ID + include_compaction_messages: If True, return SummaryMessage; if False, return UserMessage + + Returns: + List of LettaMessage objects to yield to the client + """ + if include_compaction_messages: + # New behavior: structured SummaryMessage + return [ + SummaryMessage( + id=summary_message.id, + date=summary_message.created_at, + summary=summary_text, + otid=Message.generate_otid_from_id(summary_message.id, 0), + step_id=step_id, + run_id=run_id, + ), + ] + else: + # Old behavior: UserMessage with packed JSON + return list(Message.to_letta_messages(summary_message)) + @trace_method async def _step( self, @@ -569,6 +642,7 @@ class LettaAgentV3(LettaAgentV2): remaining_turns: int = -1, dry_run: bool = False, enforce_run_id_set: bool = True, + include_compaction_messages: bool = False, ) -> AsyncGenerator[LettaMessage | dict, None]: """ Execute a single agent step (one LLM call and tool execution). @@ -790,16 +864,43 @@ class LettaAgentV3(LettaAgentV2): self.logger.info( f"Context window exceeded (error {e}), trying to compact messages attempt {llm_request_attempt + 1} of {summarizer_settings.max_summarizer_retries + 1}" ) - # checkpoint summarized messages - # TODO: might want to delay this checkpoint in case of corrupated state try: - summary_message, messages, _ = await self.compact( + # Yield event notification before compaction starts + if include_compaction_messages: + yield self._create_compaction_event_message( + step_id=step_id, + run_id=run_id, + trigger="context_window_exceeded", + ) + + summary_message, messages, summary_text = await self.compact( messages, trigger_threshold=self.agent_state.llm_config.context_window, run_id=run_id, step_id=step_id, + use_summary_role=include_compaction_messages, ) self.logger.info("Summarization succeeded, continuing to retry LLM request") + + # Persist the summary message + self.response_messages.append(summary_message) + await self._checkpoint_messages( + run_id=run_id, + step_id=step_id, + new_messages=[summary_message], + in_context_messages=messages, + ) + + # Yield summary result message to client + for msg in self._create_summary_result_message( + summary_message=summary_message, + summary_text=summary_text, + step_id=step_id, + run_id=run_id, + include_compaction_messages=include_compaction_messages, + ): + yield msg + continue except SystemPromptTokenExceededError: self.stop_reason = LettaStopReason( @@ -811,14 +912,6 @@ class LettaAgentV3(LettaAgentV2): self.logger.error(f"Unknown error occured for summarization run {run_id}: {e}") raise e - # update the messages - await self._checkpoint_messages( - run_id=run_id, - step_id=step_id, - new_messages=[summary_message], - in_context_messages=messages, - ) - else: self.stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value) self.logger.error(f"Unknown error occured for run {run_id}: {e}") @@ -919,17 +1012,34 @@ class LettaAgentV3(LettaAgentV2): self.logger.info( f"Context window exceeded (current: {self.context_token_estimate}, threshold: {self.agent_state.llm_config.context_window}), trying to compact messages" ) - summary_message, messages, _ = await self.compact( + + # Yield event notification before compaction starts + if include_compaction_messages: + yield self._create_compaction_event_message( + step_id=step_id, + run_id=run_id, + trigger="post_step_context_check", + ) + + summary_message, messages, summary_text = await self.compact( messages, trigger_threshold=self.agent_state.llm_config.context_window, run_id=run_id, step_id=step_id, + use_summary_role=include_compaction_messages, ) - # TODO: persist + return the summary message - # TODO: convert this to a SummaryMessage self.response_messages.append(summary_message) - for message in Message.to_letta_messages(summary_message): - yield message + + # Yield summary result message to client + for msg in self._create_summary_result_message( + summary_message=summary_message, + summary_text=summary_text, + step_id=step_id, + run_id=run_id, + include_compaction_messages=include_compaction_messages, + ): + yield msg + await self._checkpoint_messages( run_id=run_id, step_id=step_id, @@ -1501,6 +1611,7 @@ class LettaAgentV3(LettaAgentV2): compaction_settings: Optional["CompactionSettings"] = None, run_id: Optional[str] = None, step_id: Optional[str] = None, + use_summary_role: bool = False, ) -> tuple[Message, list[Message], str]: """Compact the current in-context messages for this agent. @@ -1508,6 +1619,11 @@ class LettaAgentV3(LettaAgentV2): ``compaction_settings.model`` when provided. This mirrors how agent creation derives defaults from provider-specific ModelSettings, but is localized to summarization. + + Args: + use_summary_role: If True, the summary message will be created with + role=summary instead of role=user. This enables first-class + summary message handling in the database and API responses. """ # Use the passed-in compaction_settings first, then agent's compaction_settings if set, @@ -1630,23 +1746,36 @@ class LettaAgentV3(LettaAgentV2): summary=summary, timezone=self.agent_state.timezone, ) - summary_messages = await convert_message_creates_to_messages( - message_creates=[ - MessageCreate( - role=MessageRole.user, - content=[TextContent(text=summary_message_str_packed)], - ) - ], - agent_id=self.agent_state.id, - timezone=self.agent_state.timezone, - # We already packed, don't pack again - wrap_user_message=False, - wrap_system_message=False, - run_id=None, # TODO: add this - ) - if not len(summary_messages) == 1: - self.logger.error(f"Expected only one summary message, got {len(summary_messages)} in {summary_messages}") - summary_message_obj = summary_messages[0] + + if use_summary_role: + # New behavior: Create Message directly with role=summary + # (bypassing MessageCreate which only accepts user/system/assistant roles) + summary_message_obj = Message( + role=MessageRole.summary, + content=[TextContent(text=summary_message_str_packed)], + agent_id=self.agent_state.id, + run_id=run_id, + step_id=step_id, + ) + else: + # Legacy behavior: Use convert_message_creates_to_messages with role=user + summary_messages = await convert_message_creates_to_messages( + message_creates=[ + MessageCreate( + role=MessageRole.user, + content=[TextContent(text=summary_message_str_packed)], + ) + ], + agent_id=self.agent_state.id, + timezone=self.agent_state.timezone, + # We already packed, don't pack again + wrap_user_message=False, + wrap_system_message=False, + run_id=run_id, + ) + if not len(summary_messages) == 1: + self.logger.error(f"Expected only one summary message, got {len(summary_messages)} in {summary_messages}") + summary_message_obj = summary_messages[0] # final messages: inject summarization message at the beginning final_messages = [compacted_messages[0]] + [summary_message_obj] diff --git a/letta/groups/sleeptime_multi_agent_v3.py b/letta/groups/sleeptime_multi_agent_v3.py index 6c4ea830..080f3abe 100644 --- a/letta/groups/sleeptime_multi_agent_v3.py +++ b/letta/groups/sleeptime_multi_agent_v3.py @@ -47,6 +47,7 @@ class SleeptimeMultiAgentV3(LettaAgentV2): include_return_message_types: list[MessageType] | None = None, request_start_timestamp_ns: int | None = None, client_tools: list[ClientToolSchema] | None = None, + include_compaction_messages: bool = False, ) -> LettaResponse: self.run_ids = [] @@ -61,6 +62,7 @@ class SleeptimeMultiAgentV3(LettaAgentV2): include_return_message_types=include_return_message_types, request_start_timestamp_ns=request_start_timestamp_ns, client_tools=client_tools, + include_compaction_messages=include_compaction_messages, ) await self.run_sleeptime_agents() @@ -79,6 +81,7 @@ class SleeptimeMultiAgentV3(LettaAgentV2): request_start_timestamp_ns: int | None = None, include_return_message_types: list[MessageType] | None = None, client_tools: list[ClientToolSchema] | None = None, + include_compaction_messages: bool = False, ) -> AsyncGenerator[str, None]: self.run_ids = [] @@ -96,6 +99,7 @@ class SleeptimeMultiAgentV3(LettaAgentV2): include_return_message_types=include_return_message_types, request_start_timestamp_ns=request_start_timestamp_ns, client_tools=client_tools, + include_compaction_messages=include_compaction_messages, ): yield chunk finally: diff --git a/letta/groups/sleeptime_multi_agent_v4.py b/letta/groups/sleeptime_multi_agent_v4.py index 85e7ca08..58a161cc 100644 --- a/letta/groups/sleeptime_multi_agent_v4.py +++ b/letta/groups/sleeptime_multi_agent_v4.py @@ -48,6 +48,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3): request_start_timestamp_ns: int | None = None, conversation_id: str | None = None, client_tools: list[ClientToolSchema] | None = None, + include_compaction_messages: bool = False, ) -> LettaResponse: self.run_ids = [] @@ -63,6 +64,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3): request_start_timestamp_ns=request_start_timestamp_ns, conversation_id=conversation_id, client_tools=client_tools, + include_compaction_messages=include_compaction_messages, ) run_ids = await self.run_sleeptime_agents() @@ -81,6 +83,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3): include_return_message_types: list[MessageType] | None = None, conversation_id: str | None = None, client_tools: list[ClientToolSchema] | None = None, + include_compaction_messages: bool = False, ) -> AsyncGenerator[str, None]: self.run_ids = [] @@ -99,6 +102,7 @@ class SleeptimeMultiAgentV4(LettaAgentV3): request_start_timestamp_ns=request_start_timestamp_ns, conversation_id=conversation_id, client_tools=client_tools, + include_compaction_messages=include_compaction_messages, ): yield chunk finally: diff --git a/letta/schemas/enums.py b/letta/schemas/enums.py index 96efb446..b7fc27f0 100644 --- a/letta/schemas/enums.py +++ b/letta/schemas/enums.py @@ -103,6 +103,7 @@ class MessageRole(str, Enum): function = "function" system = "system" approval = "approval" + summary = "summary" class MessageSourceType(str, Enum): diff --git a/letta/schemas/letta_message.py b/letta/schemas/letta_message.py index b90628dd..fde08f5a 100644 --- a/letta/schemas/letta_message.py +++ b/letta/schemas/letta_message.py @@ -61,6 +61,8 @@ class MessageType(str, Enum): tool_return_message = "tool_return_message" approval_request_message = "approval_request_message" approval_response_message = "approval_response_message" + summary_message = "summary_message" + event_message = "event_message" class LettaMessage(BaseModel): @@ -399,7 +401,7 @@ class SummaryMessage(LettaMessage): A message representing a summary of the conversation. Sent to the LLM as a user or system message depending on the provider. """ - message_type: Literal["summary"] = "summary_message" + message_type: Literal["summary_message"] = "summary_message" summary: str @@ -408,7 +410,7 @@ class EventMessage(LettaMessage): A message for notifying the developer that an event that has occured (e.g. a compaction). Events are NOT part of the context window. """ - message_type: Literal["event"] = "event_message" + message_type: Literal["event_message"] = "event_message" event_type: Literal["compaction"] event_data: dict @@ -459,8 +461,8 @@ def create_letta_message_union_schema(): "assistant_message": "#/components/schemas/AssistantMessage", "approval_request_message": "#/components/schemas/ApprovalRequestMessage", "approval_response_message": "#/components/schemas/ApprovalResponseMessage", - "summary": "#/components/schemas/SummaryMessage", - "event": "#/components/schemas/EventMessage", + "summary_message": "#/components/schemas/SummaryMessage", + "event_message": "#/components/schemas/EventMessage", }, }, } diff --git a/letta/schemas/letta_request.py b/letta/schemas/letta_request.py index 9290cc38..e72aa333 100644 --- a/letta/schemas/letta_request.py +++ b/letta/schemas/letta_request.py @@ -73,6 +73,13 @@ class LettaRequest(BaseModel): "This allows sending a message to a different model without changing the agent's configuration.", ) + # Compaction message format + include_compaction_messages: bool = Field( + default=False, + description="If True, compaction events emit structured `SummaryMessage` and `EventMessage` types. " + "If False (default), compaction messages are not included in the response.", + ) + @field_validator("messages", mode="before") @classmethod def add_default_type_to_messages(cls, v): diff --git a/letta/schemas/message.py b/letta/schemas/message.py index 22ed104a..5391dd5b 100644 --- a/letta/schemas/message.py +++ b/letta/schemas/message.py @@ -37,6 +37,7 @@ from letta.schemas.letta_message import ( MessageType, ReasoningMessage, ReasoningMessageListResult, + SummaryMessage, SystemMessage, SystemMessageListResult, ToolCall, @@ -290,7 +291,7 @@ class Message(BaseMessage): @field_validator("role") @classmethod def validate_role(cls, v: str) -> str: - roles = ["system", "assistant", "user", "tool", "approval"] + roles = ["system", "assistant", "user", "tool", "approval", "summary"] assert v in roles, f"Role must be one of {roles}" return v @@ -320,6 +321,7 @@ class Message(BaseMessage): reverse: bool = True, include_err: Optional[bool] = None, text_is_assistant_message: bool = False, + convert_summary_to_user: bool = True, ) -> List[LettaMessage]: if use_assistant_message: message_ids_to_remove = [] @@ -352,6 +354,7 @@ class Message(BaseMessage): reverse=reverse, include_err=include_err, text_is_assistant_message=text_is_assistant_message, + convert_summary_to_user=convert_summary_to_user, ) ] @@ -365,6 +368,7 @@ class Message(BaseMessage): reverse: bool = True, include_err: Optional[bool] = None, text_is_assistant_message: bool = False, + convert_summary_to_user: bool = True, ) -> List[LettaMessageSearchResult]: """Convert MessageSearchResult objects into LettaMessageSearchResult objects. @@ -385,6 +389,7 @@ class Message(BaseMessage): reverse=reverse, include_err=include_err, text_is_assistant_message=text_is_assistant_message, + convert_summary_to_user=convert_summary_to_user, ) for lm in letta_messages: @@ -445,8 +450,14 @@ class Message(BaseMessage): reverse: bool = True, include_err: Optional[bool] = None, text_is_assistant_message: bool = False, + convert_summary_to_user: bool = True, ) -> List[LettaMessage]: - """Convert message object (in DB format) to the style used by the original Letta API""" + """Convert message object (in DB format) to the style used by the original Letta API + + Args: + convert_summary_to_user: If True (default), summary messages are returned as UserMessage + for backward compatibility. If False, return as SummaryMessage. + """ messages = [] if self.role == MessageRole.assistant: @@ -468,6 +479,8 @@ class Message(BaseMessage): messages.append(self._convert_user_message()) elif self.role == MessageRole.system: messages.append(self._convert_system_message()) + elif self.role == MessageRole.summary: + messages.append(self._convert_summary_message(as_user_message=convert_summary_to_user)) elif self.role == MessageRole.approval: if self.content: messages.extend(self._convert_reasoning_messages(text_is_assistant_message=text_is_assistant_message)) @@ -1036,6 +1049,45 @@ class Message(BaseMessage): run_id=self.run_id, ) + def _convert_summary_message(self, as_user_message: bool = True) -> Union[SummaryMessage, UserMessage]: + """Convert summary role message to SummaryMessage or UserMessage. + + Args: + as_user_message: If True, return UserMessage for backward compatibility with + clients that don't support SummaryMessage. If False, return SummaryMessage. + """ + if self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent): + text_content = self.content[0].text + else: + raise ValueError(f"Invalid summary message (no text object on message): {self.content}") + + # Unpack the summary from the packed JSON format + # The packed format is: {"type": "system_alert", "message": "...", "time": "..."} + summary = unpack_message(text_content) + + if as_user_message: + # Return as UserMessage for backward compatibility + return UserMessage( + id=self.id, + date=self.created_at, + content=summary, + name=self.name, + otid=self.otid, + sender_id=self.sender_id, + step_id=self.step_id, + is_err=self.is_err, + run_id=self.run_id, + ) + else: + return SummaryMessage( + id=self.id, + date=self.created_at, + summary=summary, + otid=self.otid, + step_id=self.step_id, + run_id=self.run_id, + ) + @staticmethod def dict_to_message( agent_id: str, @@ -1297,6 +1349,14 @@ class Message(BaseMessage): "role": self.role, } + elif self.role == "summary": + # Summary messages are converted to user messages (same as current system_alert behavior) + assert text_content is not None, vars(self) + openai_message = { + "content": text_content, + "role": "user", + } + elif self.role == "assistant" or self.role == "approval": try: assert self.tool_calls is not None or text_content is not None, vars(self) @@ -1474,6 +1534,16 @@ class Message(BaseMessage): message_dicts.append(user_dict) + elif self.role == "summary": + # Summary messages are converted to user messages (same as current system_alert behavior) + assert self.content and len(self.content) == 1 and isinstance(self.content[0], TextContent), vars(self) + message_dicts.append( + { + "role": "user", + "content": self.content[0].text, + } + ) + elif self.role == "assistant" or self.role == "approval": # Validate that message has content OpenAI Responses API can process if self.tool_calls is None and (self.content is None or len(self.content) == 0): @@ -1793,6 +1863,14 @@ class Message(BaseMessage): "role": self.role, } + elif self.role == "summary": + # Summary messages are converted to user messages (same as current system_alert behavior) + assert text_content is not None, vars(self) + anthropic_message = { + "content": text_content, + "role": "user", + } + elif self.role == "assistant" or self.role == "approval": # Validate that message has content Anthropic API can process if self.tool_calls is None and (self.content is None or len(self.content) == 0): @@ -2053,6 +2131,14 @@ class Message(BaseMessage): "parts": content_parts, } + elif self.role == "summary": + # Summary messages are converted to user messages (same as current system_alert behavior) + assert text_content is not None, vars(self) + google_ai_message = { + "role": "user", + "parts": [{"text": text_content}], + } + elif self.role == "assistant" or self.role == "approval": # Validate that message has content Google API can process if self.tool_calls is None and text_content is None and len(self.content) <= 1: @@ -2290,6 +2376,10 @@ class Message(BaseMessage): return self.role == "approval" and self.tool_calls is None and self.approve is not None def is_summarization_message(self) -> bool: + # First-class summary role (new format) + if self.role == "summary": + return True + # Legacy format: user message with system_alert content return ( self.role == "user" and self.content is not None diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index fcd3944e..947245ae 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -1608,6 +1608,7 @@ async def send_message( request_start_timestamp_ns=request_start_timestamp_ns, include_return_message_types=request.include_return_message_types, client_tools=request.client_tools, + include_compaction_messages=request.include_compaction_messages, ) else: result = await server.send_message_to_agent( @@ -1820,6 +1821,7 @@ async def _process_message_background( max_steps: int = DEFAULT_MAX_STEPS, include_return_message_types: list[MessageType] | None = None, override_model: str | None = None, + include_compaction_messages: bool = False, ) -> None: """Background task to process the message and update run status.""" request_start_timestamp_ns = get_utc_timestamp_ns() @@ -1866,6 +1868,7 @@ async def _process_message_background( use_assistant_message=use_assistant_message, request_start_timestamp_ns=request_start_timestamp_ns, include_return_message_types=include_return_message_types, + include_compaction_messages=include_compaction_messages, ) else: result = await server.send_message_to_agent( @@ -2048,6 +2051,7 @@ async def send_message_async( max_steps=request.max_steps, include_return_message_types=request.include_return_message_types, override_model=request.override_model, + include_compaction_messages=request.include_compaction_messages, ), label=f"process_message_background_{run.id}", ) diff --git a/letta/server/rest_api/routers/v1/conversations.py b/letta/server/rest_api/routers/v1/conversations.py index 88dff474..eb8bd948 100644 --- a/letta/server/rest_api/routers/v1/conversations.py +++ b/letta/server/rest_api/routers/v1/conversations.py @@ -265,6 +265,7 @@ async def send_conversation_message( include_return_message_types=request.include_return_message_types, client_tools=request.client_tools, conversation_id=conversation_id, + include_compaction_messages=request.include_compaction_messages, ) diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 82057622..9ed43891 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -150,6 +150,7 @@ class StreamingService: actor=actor, conversation_id=conversation_id, client_tools=request.client_tools, + include_compaction_messages=request.include_compaction_messages, ) # handle background streaming if requested @@ -326,6 +327,7 @@ class StreamingService: actor: User, conversation_id: Optional[str] = None, client_tools: Optional[list[ClientToolSchema]] = None, + include_compaction_messages: bool = False, ) -> AsyncIterator: """ Create a stream with unified error handling. @@ -354,6 +356,7 @@ class StreamingService: include_return_message_types=include_return_message_types, conversation_id=conversation_id, client_tools=client_tools, + include_compaction_messages=include_compaction_messages, ) async for chunk in stream: diff --git a/tests/integration_test_summarizer.py b/tests/integration_test_summarizer.py index fec63a4d..f865f368 100644 --- a/tests/integration_test_summarizer.py +++ b/tests/integration_test_summarizer.py @@ -20,7 +20,7 @@ from letta.schemas.agent import CreateAgent, UpdateAgent from letta.schemas.block import BlockUpdate, CreateBlock from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import MessageRole -from letta.schemas.letta_message import LettaMessage +from letta.schemas.letta_message import EventMessage, LettaMessage, SummaryMessage from letta.schemas.letta_message_content import TextContent, ToolCallContent, ToolReturnContent from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message as PydanticMessage, MessageCreate @@ -725,7 +725,7 @@ async def test_summarize_with_mode(server: SyncServer, actor, llm_config: LLMCon agent_loop = LettaAgentV3(agent_state=agent_state, actor=actor) - summary, result, _ = await agent_loop.compact(messages=in_context_messages) + summary, result, summary_text = await agent_loop.compact(messages=in_context_messages) assert isinstance(result, list) @@ -734,10 +734,16 @@ async def test_summarize_with_mode(server: SyncServer, actor, llm_config: LLMCon assert hasattr(msg, "role") assert hasattr(msg, "content") + # Verify the summary text (third return value) is a non-empty string. + # This is used by the agent loop to construct a SummaryMessage for clients. + assert isinstance(summary_text, str), f"Expected summary_text to be a string, got {type(summary_text)}" + assert len(summary_text) > 0, "Expected non-empty summary text" + print() print(f"RESULTS {mode} ======") for msg in result: print(f"MSG: {msg}") + print(f"SUMMARY TEXT: {summary_text[:200]}...") print() @@ -759,6 +765,87 @@ async def test_summarize_with_mode(server: SyncServer, actor, llm_config: LLMCon assert result[1].role == MessageRole.user +@pytest.mark.asyncio +@pytest.mark.parametrize( + "llm_config", + TESTED_LLM_CONFIGS, + ids=[c.model for c in TESTED_LLM_CONFIGS], +) +async def test_compact_returns_valid_summary_message_and_event_message(server: SyncServer, actor, llm_config: LLMConfig): + """ + Test that compact() return values can be used to construct valid SummaryMessage and EventMessage objects. + + This validates the contract that _step() relies on: compact() returns + (summary_message_obj, compacted_messages, summary_text) where summary_text + is used to build a SummaryMessage and the metadata is used for an EventMessage. + """ + import uuid + + from letta.helpers.datetime_helpers import get_utc_time + + # Create a conversation with enough messages to summarize + messages = [ + PydanticMessage( + role=MessageRole.system, + content=[TextContent(type="text", text="You are a helpful assistant.")], + ) + ] + for i in range(10): + messages.append( + PydanticMessage( + role=MessageRole.user, + content=[TextContent(type="text", text=f"User message {i}: Test message {i}.")], + ) + ) + messages.append( + PydanticMessage( + role=MessageRole.assistant, + content=[TextContent(type="text", text=f"Assistant response {i}: Acknowledged message {i}.")], + ) + ) + + agent_state, in_context_messages = await create_agent_with_messages(server, actor, llm_config, messages) + + handle = llm_config.handle or f"{llm_config.model_endpoint_type}/{llm_config.model}" + agent_state.compaction_settings = CompactionSettings(model=handle, mode="all") + + agent_loop = LettaAgentV3(agent_state=agent_state, actor=actor) + + summary_message_obj, compacted_messages, summary_text = await agent_loop.compact(messages=in_context_messages) + + # Verify we can construct a valid SummaryMessage from compact() return values + summary_msg = SummaryMessage( + id=summary_message_obj.id, + date=summary_message_obj.created_at, + summary=summary_text, + otid=PydanticMessage.generate_otid_from_id(summary_message_obj.id, 0), + step_id=None, + run_id=None, + ) + assert summary_msg.message_type == "summary_message" + assert isinstance(summary_msg.summary, str) + assert len(summary_msg.summary) > 0 + assert summary_msg.id == summary_message_obj.id + + # Verify we can construct a valid EventMessage for compaction + event_msg = EventMessage( + id=str(uuid.uuid4()), + date=get_utc_time(), + event_type="compaction", + event_data={ + "trigger": "post_step_context_check", + "context_token_estimate": 1000, + "context_window": agent_state.llm_config.context_window, + }, + run_id=None, + step_id=None, + ) + assert event_msg.message_type == "event_message" + assert event_msg.event_type == "compaction" + assert "trigger" in event_msg.event_data + assert "context_window" in event_msg.event_data + + @pytest.mark.asyncio async def test_v3_compact_uses_compaction_settings_model_and_model_settings(server: SyncServer, actor): """Integration test: LettaAgentV3.compact uses the LLMConfig implied by CompactionSettings. @@ -911,7 +998,7 @@ async def test_v3_summarize_hard_eviction_when_still_over_threshold( caplog.set_level("ERROR") - summary, result, _ = await agent_loop.compact( + summary, result, summary_text = await agent_loop.compact( messages=in_context_messages, trigger_threshold=context_limit, ) @@ -932,6 +1019,10 @@ async def test_v3_summarize_hard_eviction_when_still_over_threshold( assert result[0].role == MessageRole.system assert result[1].role == MessageRole.user + # Verify the summary text is returned (used to construct SummaryMessage in the agent loop) + assert isinstance(summary_text, str), f"Expected summary_text to be a string, got {type(summary_text)}" + assert len(summary_text) > 0, "Expected non-empty summary text after hard eviction" + # ====================================================================================================================== # Sliding Window Summarizer Unit Tests