From d81382b9fe3f980140433add4b1a3261a4f09be1 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Sun, 6 Jul 2025 20:15:22 -0700 Subject: [PATCH] refactor: revert the default summarizer to the old style (#3168) --- letta/agents/letta_agent.py | 31 ++- letta/agents/voice_agent.py | 2 +- letta/agents/voice_sleeptime_agent.py | 2 +- letta/prompts/gpt_summarize.py | 10 +- letta/server/rest_api/routers/v1/agents.py | 19 ++ letta/services/summarizer/enums.py | 1 + letta/services/summarizer/summarizer.py | 245 +++++++++++++++++- letta/settings.py | 7 +- letta/system.py | 16 ++ .../llm_model_configs/openai-gpt-4o-mini.json | 2 +- 10 files changed, 311 insertions(+), 24 deletions(-) diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index 65478b31..9a6aaadb 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -82,11 +82,16 @@ class LettaAgent(BaseAgent): step_manager: StepManager = NoopStepManager(), telemetry_manager: TelemetryManager = NoopTelemetryManager(), current_run_id: str | None = None, + ## summarizer settings + summarizer_mode: SummarizationMode = summarizer_settings.mode, + # for static_buffer mode summary_block_label: str = DEFAULT_SUMMARY_BLOCK_LABEL, message_buffer_limit: int = summarizer_settings.message_buffer_limit, message_buffer_min: int = summarizer_settings.message_buffer_min, enable_summarization: bool = summarizer_settings.enable_summarization, max_summarization_retries: int = summarizer_settings.max_summarization_retries, + # for partial_evict mode + partial_evict_summarizer_percentage: float = summarizer_settings.partial_evict_summarizer_percentage, ): super().__init__(agent_id=agent_id, openai_client=None, message_manager=message_manager, agent_manager=agent_manager, actor=actor) @@ -124,11 +129,13 @@ class LettaAgent(BaseAgent): ) self.summarizer = Summarizer( - mode=SummarizationMode(summarizer_settings.mode), + mode=summarizer_mode, + # TODO consolidate to not use this, or push it into the Summarizer() class summarizer_agent=self.summarization_agent, # TODO: Make this configurable message_buffer_limit=message_buffer_limit, message_buffer_min=message_buffer_min, + partial_evict_summarizer_percentage=partial_evict_summarizer_percentage, ) async def _check_run_cancellation(self) -> bool: @@ -872,25 +879,35 @@ class LettaAgent(BaseAgent): self.logger.warning( f"Total tokens {total_tokens} exceeds configured max tokens {llm_config.context_window}, forcefully clearing message history." ) - new_in_context_messages, updated = self.summarizer.summarize( - in_context_messages=in_context_messages, new_letta_messages=new_letta_messages, force=True, clear=True + new_in_context_messages, updated = await self.summarizer.summarize( + in_context_messages=in_context_messages, + new_letta_messages=new_letta_messages, + force=True, + clear=True, ) else: - new_in_context_messages, updated = self.summarizer.summarize( - in_context_messages=in_context_messages, new_letta_messages=new_letta_messages + self.logger.info( + f"Total tokens {total_tokens} does not exceed configured max tokens {llm_config.context_window}, passing summarizing w/o force." + ) + new_in_context_messages, updated = await self.summarizer.summarize( + in_context_messages=in_context_messages, + new_letta_messages=new_letta_messages, ) await self.agent_manager.set_in_context_messages_async( - agent_id=self.agent_id, message_ids=[m.id for m in new_in_context_messages], actor=self.actor + agent_id=self.agent_id, + message_ids=[m.id for m in new_in_context_messages], + actor=self.actor, ) return new_in_context_messages @trace_method async def summarize_conversation_history(self) -> AgentState: + """Called when the developer explicitly triggers compaction via the API""" agent_state = await self.agent_manager.get_agent_by_id_async(agent_id=self.agent_id, actor=self.actor) message_ids = agent_state.message_ids in_context_messages = await self.message_manager.get_messages_by_ids_async(message_ids=message_ids, actor=self.actor) - new_in_context_messages, updated = self.summarizer.summarize( + new_in_context_messages, updated = await self.summarizer.summarize( in_context_messages=in_context_messages, new_letta_messages=[], force=True ) return await self.agent_manager.set_in_context_messages_async( diff --git a/letta/agents/voice_agent.py b/letta/agents/voice_agent.py index 63ee8da2..0c77626e 100644 --- a/letta/agents/voice_agent.py +++ b/letta/agents/voice_agent.py @@ -295,7 +295,7 @@ class VoiceAgent(BaseAgent): new_letta_messages = await self.message_manager.create_many_messages_async(letta_message_db_queue, actor=self.actor) # TODO: Make this more general and configurable, less brittle - new_in_context_messages, updated = summarizer.summarize( + new_in_context_messages, updated = await summarizer.summarize( in_context_messages=in_context_messages, new_letta_messages=new_letta_messages ) diff --git a/letta/agents/voice_sleeptime_agent.py b/letta/agents/voice_sleeptime_agent.py index 1d997be2..e9d013f9 100644 --- a/letta/agents/voice_sleeptime_agent.py +++ b/letta/agents/voice_sleeptime_agent.py @@ -90,7 +90,7 @@ class VoiceSleeptimeAgent(LettaAgent): current_in_context_messages, new_in_context_messages, stop_reason, usage = await super()._step( agent_state=agent_state, input_messages=input_messages, max_steps=max_steps ) - new_in_context_messages, updated = self.summarizer.summarize( + new_in_context_messages, updated = await self.summarizer.summarize( in_context_messages=current_in_context_messages, new_letta_messages=new_in_context_messages ) self.agent_manager.set_in_context_messages( diff --git a/letta/prompts/gpt_summarize.py b/letta/prompts/gpt_summarize.py index 945268de..c9e9ccdd 100644 --- a/letta/prompts/gpt_summarize.py +++ b/letta/prompts/gpt_summarize.py @@ -1,14 +1,12 @@ WORD_LIMIT = 100 -SYSTEM = f""" -Your job is to summarize a history of previous messages in a conversation between an AI persona and a human. +SYSTEM = f"""Your job is to summarize a history of previous messages in a conversation between an AI persona and a human. The conversation you are given is a from a fixed context window and may not be complete. Messages sent by the AI are marked with the 'assistant' role. -The AI 'assistant' can also make calls to functions, whose outputs can be seen in messages with the 'function' role. +The AI 'assistant' can also make calls to tools, whose outputs can be seen in messages with the 'tool' role. Things the AI says in the message content are considered inner monologue and are not seen by the user. The only AI messages seen by the user are from when the AI uses 'send_message'. Messages the user sends are in the 'user' role. The 'user' role is also used for important system events, such as login events and heartbeat events (heartbeats run the AI's program without user action, allowing the AI to act without prompting from the user sending them a message). -Summarize what happened in the conversation from the perspective of the AI (use the first person). +Summarize what happened in the conversation from the perspective of the AI (use the first person from the perspective of the AI). Keep your summary less than {WORD_LIMIT} words, do NOT exceed this word limit. -Only output the summary, do NOT include anything else in your output. -""" +Only output the summary, do NOT include anything else in your output.""" diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 7f8ce572..6f0df27d 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -38,6 +38,7 @@ from letta.schemas.user import User from letta.serialize_schemas.pydantic_agent_schema import AgentSchema from letta.server.rest_api.utils import get_letta_server from letta.server.server import SyncServer +from letta.services.summarizer.enums import SummarizationMode from letta.services.telemetry_manager import NoopTelemetryManager from letta.settings import settings from letta.utils import safe_create_task @@ -750,6 +751,12 @@ async def send_message( step_manager=server.step_manager, telemetry_manager=server.telemetry_manager if settings.llm_api_logging else NoopTelemetryManager(), current_run_id=run.id, + # summarizer settings to be added here + summarizer_mode=( + SummarizationMode.STATIC_MESSAGE_BUFFER + if agent.agent_type == AgentType.voice_convo_agent + else SummarizationMode.PARTIAL_EVICT_MESSAGE_BUFFER + ), ) result = await agent_loop.step( @@ -878,6 +885,12 @@ async def send_message_streaming( step_manager=server.step_manager, telemetry_manager=server.telemetry_manager if settings.llm_api_logging else NoopTelemetryManager(), current_run_id=run.id, + # summarizer settings to be added here + summarizer_mode=( + SummarizationMode.STATIC_MESSAGE_BUFFER + if agent.agent_type == AgentType.voice_convo_agent + else SummarizationMode.PARTIAL_EVICT_MESSAGE_BUFFER + ), ) from letta.server.rest_api.streaming_response import StreamingResponseWithStatusCode @@ -1014,6 +1027,12 @@ async def _process_message_background( actor=actor, step_manager=server.step_manager, telemetry_manager=server.telemetry_manager if settings.llm_api_logging else NoopTelemetryManager(), + # summarizer settings to be added here + summarizer_mode=( + SummarizationMode.STATIC_MESSAGE_BUFFER + if agent.agent_type == AgentType.voice_convo_agent + else SummarizationMode.PARTIAL_EVICT_MESSAGE_BUFFER + ), ) result = await agent_loop.step( diff --git a/letta/services/summarizer/enums.py b/letta/services/summarizer/enums.py index 33c42d65..620ec332 100644 --- a/letta/services/summarizer/enums.py +++ b/letta/services/summarizer/enums.py @@ -7,3 +7,4 @@ class SummarizationMode(str, Enum): """ STATIC_MESSAGE_BUFFER = "static_message_buffer_mode" + PARTIAL_EVICT_MESSAGE_BUFFER = "partial_evict_message_buffer_mode" diff --git a/letta/services/summarizer/summarizer.py b/letta/services/summarizer/summarizer.py index 7795117e..699998e6 100644 --- a/letta/services/summarizer/summarizer.py +++ b/letta/services/summarizer/summarizer.py @@ -4,13 +4,19 @@ import traceback from typing import List, Optional, Tuple, Union from letta.agents.ephemeral_summary_agent import EphemeralSummaryAgent -from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG +from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG, MESSAGE_SUMMARY_REQUEST_ACK +from letta.helpers.message_helper import convert_message_creates_to_messages +from letta.llm_api.llm_client import LLMClient from letta.log import get_logger from letta.otel.tracing import trace_method +from letta.prompts import gpt_summarize from letta.schemas.enums import MessageRole from letta.schemas.letta_message_content import TextContent +from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message, MessageCreate +from letta.schemas.user import User from letta.services.summarizer.enums import SummarizationMode +from letta.system import package_summarize_message_no_counts from letta.templates.template_helper import render_template logger = get_logger(__name__) @@ -29,18 +35,24 @@ class Summarizer: summarizer_agent: Optional[Union[EphemeralSummaryAgent, "VoiceSleeptimeAgent"]] = None, message_buffer_limit: int = 10, message_buffer_min: int = 3, + partial_evict_summarizer_percentage: float = 0.30, ): self.mode = mode # Need to do validation on this + # TODO: Move this to config self.message_buffer_limit = message_buffer_limit self.message_buffer_min = message_buffer_min self.summarizer_agent = summarizer_agent - # TODO: Move this to config + self.partial_evict_summarizer_percentage = partial_evict_summarizer_percentage @trace_method - def summarize( - self, in_context_messages: List[Message], new_letta_messages: List[Message], force: bool = False, clear: bool = False + async def summarize( + self, + in_context_messages: List[Message], + new_letta_messages: List[Message], + force: bool = False, + clear: bool = False, ) -> Tuple[List[Message], bool]: """ Summarizes or trims in_context_messages according to the chosen mode, @@ -58,7 +70,19 @@ class Summarizer: (could be appended to the conversation if desired) """ if self.mode == SummarizationMode.STATIC_MESSAGE_BUFFER: - return self._static_buffer_summarization(in_context_messages, new_letta_messages, force=force, clear=clear) + return self._static_buffer_summarization( + in_context_messages, + new_letta_messages, + force=force, + clear=clear, + ) + elif self.mode == SummarizationMode.PARTIAL_EVICT_MESSAGE_BUFFER: + return await self._partial_evict_buffer_summarization( + in_context_messages, + new_letta_messages, + force=force, + clear=clear, + ) else: # Fallback or future logic return in_context_messages, False @@ -75,9 +99,131 @@ class Summarizer: task.add_done_callback(callback) return task - def _static_buffer_summarization( - self, in_context_messages: List[Message], new_letta_messages: List[Message], force: bool = False, clear: bool = False + async def _partial_evict_buffer_summarization( + self, + in_context_messages: List[Message], + new_letta_messages: List[Message], + force: bool = False, + clear: bool = False, ) -> Tuple[List[Message], bool]: + """Summarization as implemented in the original MemGPT loop, but using message count instead of token count. + Evict a partial amount of messages, and replace message[1] with a recursive summary. + + Note that this can't be made sync, because we're waiting on the summary to inject it into the context window, + unlike the version that writes it to a block. + + Unless force is True, don't summarize. + Ignore clear, we don't use it. + """ + all_in_context_messages = in_context_messages + new_letta_messages + + if not force: + logger.debug("Not forcing summarization, returning in-context messages as is.") + return all_in_context_messages, False + + # Very ugly code to pull LLMConfig etc from the SummarizerAgent if we're not using it for anything else + assert self.summarizer_agent is not None + + # First step: determine how many messages to retain + total_message_count = len(all_in_context_messages) + assert self.partial_evict_summarizer_percentage >= 0.0 and self.partial_evict_summarizer_percentage <= 1.0 + target_message_start = round((1.0 - self.partial_evict_summarizer_percentage) * total_message_count) + logger.info(f"Target message count: {total_message_count}->{(total_message_count-target_message_start)}") + + # The summary message we'll insert is role 'user' (vs 'assistant', 'tool', or 'system') + # We are going to put it at index 1 (index 0 is the system message) + # That means that index 2 needs to be role 'assistant', so walk up the list starting at + # the target_message_count and find the first assistant message + for i in range(target_message_start, total_message_count): + if all_in_context_messages[i].role == MessageRole.assistant: + assistant_message_index = i + break + else: + raise ValueError(f"No assistant message found from indices {target_message_start} to {total_message_count}") + + # The sequence to summarize is index 1 -> assistant_message_index + messages_to_summarize = all_in_context_messages[1:assistant_message_index] + logger.info(f"Eviction indices: {1}->{assistant_message_index}(/{total_message_count})") + + # Dynamically get the LLMConfig from the summarizer agent + # Pretty cringe code here that we need the agent for this but we don't use it + agent_state = await self.summarizer_agent.agent_manager.get_agent_by_id_async( + agent_id=self.summarizer_agent.agent_id, actor=self.summarizer_agent.actor + ) + + # TODO if we do this via the "agent", then we can more easily allow toggling on the memory block version + summary_message_str = await simple_summary( + messages=messages_to_summarize, + llm_config=agent_state.llm_config, + actor=self.summarizer_agent.actor, + include_ack=True, + ) + + # TODO add counts back + # Recall message count + # num_recall_messages_current = await self.message_manager.size_async(actor=self.actor, agent_id=agent_state.id) + # num_messages_evicted = len(messages_to_summarize) + # num_recall_messages_hidden = num_recall_messages_total - len() + + # Create the summary message + summary_message_str_packed = package_summarize_message_no_counts( + summary=summary_message_str, + timezone=agent_state.timezone, + ) + summary_message_obj = convert_message_creates_to_messages( + message_creates=[ + MessageCreate( + role=MessageRole.user, + content=[TextContent(text=summary_message_str_packed)], + ) + ], + agent_id=agent_state.id, + timezone=agent_state.timezone, + # We already packed, don't pack again + wrap_user_message=False, + wrap_system_message=False, + )[0] + + # Create the message in the DB + await self.summarizer_agent.message_manager.create_many_messages_async( + pydantic_msgs=[summary_message_obj], + actor=self.summarizer_agent.actor, + ) + + updated_in_context_messages = all_in_context_messages[assistant_message_index:] + return [all_in_context_messages[0], summary_message_obj] + updated_in_context_messages, True + + def _static_buffer_summarization( + self, + in_context_messages: List[Message], + new_letta_messages: List[Message], + force: bool = False, + clear: bool = False, + ) -> Tuple[List[Message], bool]: + """ + Implements static buffer summarization by maintaining a fixed-size message buffer (< N messages). + + Logic: + 1. Combine existing context messages with new messages + 2. If total messages <= buffer limit and not forced, return unchanged + 3. Calculate how many messages to retain (0 if clear=True, otherwise message_buffer_min) + 4. Find the trim index to keep the most recent messages while preserving user message boundaries + 5. Evict older messages (everything between system message and trim index) + 6. If summarizer agent is available, trigger background summarization of evicted messages + 7. Return updated context with system message + retained recent messages + + Args: + in_context_messages: Existing conversation context messages + new_letta_messages: Newly added messages to append + force: Force summarization even if buffer limit not exceeded + clear: Clear all messages except system message (retain_count = 0) + + Returns: + Tuple of (updated_messages, was_summarized) + - updated_messages: New context after trimming/summarization + - was_summarized: True if messages were evicted and summarization triggered + """ + all_in_context_messages = in_context_messages + new_letta_messages if len(all_in_context_messages) <= self.message_buffer_limit and not force: @@ -139,6 +285,91 @@ class Summarizer: return [all_in_context_messages[0]] + updated_in_context_messages, True +def simple_formatter(messages: List[Message], include_system: bool = False) -> str: + """Go from an OpenAI-style list of messages to a concatenated string""" + + parsed_messages = [message.to_openai_dict() for message in messages if message.role != MessageRole.system or include_system] + return "\n".join(json.dumps(msg) for msg in parsed_messages) + + +def simple_message_wrapper(openai_msg: dict) -> Message: + """Extremely simple way to map from role/content to Message object w/ throwaway dummy fields""" + + if "role" not in openai_msg: + raise ValueError(f"Missing role in openai_msg: {openai_msg}") + if "content" not in openai_msg: + raise ValueError(f"Missing content in openai_msg: {openai_msg}") + + if openai_msg["role"] == "user": + return Message( + role=MessageRole.user, + content=[TextContent(text=openai_msg["content"])], + ) + elif openai_msg["role"] == "assistant": + return Message( + role=MessageRole.assistant, + content=[TextContent(text=openai_msg["content"])], + ) + elif openai_msg["role"] == "system": + return Message( + role=MessageRole.system, + content=[TextContent(text=openai_msg["content"])], + ) + else: + raise ValueError(f"Unknown role: {openai_msg['role']}") + + +async def simple_summary(messages: List[Message], llm_config: LLMConfig, actor: User, include_ack: bool = True) -> str: + """Generate a simple summary from a list of messages. + + Intentionally kept functional due to the simplicity of the prompt. + """ + + # Create an LLMClient from the config + llm_client = LLMClient.create( + provider_type=llm_config.model_endpoint_type, + put_inner_thoughts_first=True, + actor=actor, + ) + assert llm_client is not None + + # Prepare the messages payload to send to the LLM + system_prompt = gpt_summarize.SYSTEM + summary_transcript = simple_formatter(messages) + + if include_ack: + input_messages = [ + {"role": "system", "content": system_prompt}, + {"role": "assistant", "content": MESSAGE_SUMMARY_REQUEST_ACK}, + {"role": "user", "content": summary_transcript}, + ] + else: + input_messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": summary_transcript}, + ] + print("messages going to summarizer:", input_messages) + input_messages_obj = [simple_message_wrapper(msg) for msg in input_messages] + print("messages going to summarizer (objs):", input_messages_obj) + + request_data = llm_client.build_request_data(input_messages_obj, llm_config, tools=[]) + print("request data:", request_data) + # NOTE: we should disable the inner_thoughts_in_kwargs here, because we don't use it + # I'm leaving it commented it out for now for safety but is fine assuming the var here is a copy not a reference + # llm_config.put_inner_thoughts_in_kwargs = False + response_data = await llm_client.request_async(request_data, llm_config) + response = llm_client.convert_response_to_chat_completion(response_data, input_messages_obj, llm_config) + if response.choices[0].message.content is None: + logger.warning("No content returned from summarizer") + # TODO raise an error error instead? + # return "[Summary failed to generate]" + raise Exception("Summary failed to generate") + else: + summary = response.choices[0].message.content.strip() + + return summary + + def format_transcript(messages: List[Message], include_system: bool = False) -> List[str]: """ Turn a list of Message objects into a human-readable transcript. diff --git a/letta/settings.py b/letta/settings.py index 1e8b9134..fb2751c3 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -39,12 +39,17 @@ class ToolSettings(BaseSettings): class SummarizerSettings(BaseSettings): model_config = SettingsConfigDict(env_prefix="letta_summarizer_", extra="ignore") - mode: SummarizationMode = SummarizationMode.STATIC_MESSAGE_BUFFER + # mode: SummarizationMode = SummarizationMode.STATIC_MESSAGE_BUFFER + mode: SummarizationMode = SummarizationMode.PARTIAL_EVICT_MESSAGE_BUFFER message_buffer_limit: int = 60 message_buffer_min: int = 15 enable_summarization: bool = True max_summarization_retries: int = 3 + # partial evict summarizer percentage + # eviction based on percentage of message count, not token count + partial_evict_summarizer_percentage: float = 0.30 + # TODO(cliandy): the below settings are tied to old summarization and should be deprecated or moved # Controls if we should evict all messages # TODO: Can refactor this into an enum if we have a bunch of different kinds of summarizers diff --git a/letta/system.py b/letta/system.py index 33337569..e4031bcc 100644 --- a/letta/system.py +++ b/letta/system.py @@ -188,6 +188,22 @@ def package_summarize_message(summary, summary_message_count, hidden_message_cou return json_dumps(packaged_message) +def package_summarize_message_no_counts(summary, timezone): + context_message = ( + f"Note: prior messages have been hidden from view due to conversation memory constraints.\n" + + f"The following is a summary of the previous messages:\n {summary}" + ) + + formatted_time = get_local_time(timezone=timezone) + packaged_message = { + "type": "system_alert", + "message": context_message, + "time": formatted_time, + } + + return json_dumps(packaged_message) + + def package_summarize_message_no_summary(hidden_message_count, message=None, timezone=None): """Add useful metadata to the summary message""" diff --git a/tests/configs/llm_model_configs/openai-gpt-4o-mini.json b/tests/configs/llm_model_configs/openai-gpt-4o-mini.json index 0e6c32b2..661b8aa1 100644 --- a/tests/configs/llm_model_configs/openai-gpt-4o-mini.json +++ b/tests/configs/llm_model_configs/openai-gpt-4o-mini.json @@ -1,5 +1,5 @@ { - "context_window": 8192, + "context_window": 128000, "model": "gpt-4o-mini", "model_endpoint_type": "openai", "model_endpoint": "https://api.openai.com/v1",