From f1bd246e9b5b3f78f32c682501020a3de6afb5fd Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Mon, 15 Dec 2025 18:52:06 -0800 Subject: [PATCH] feat: use token streaming for anthropic summarization (#7105) --- letta/services/summarizer/summarizer.py | 71 ++++++++++++++++++++----- tests/integration_test_summarizer.py | 45 ++++++++++++++++ 2 files changed, 103 insertions(+), 13 deletions(-) diff --git a/letta/services/summarizer/summarizer.py b/letta/services/summarizer/summarizer.py index 4f35b33a..8b087fbd 100644 --- a/letta/services/summarizer/summarizer.py +++ b/letta/services/summarizer/summarizer.py @@ -16,7 +16,7 @@ from letta.llm_api.llm_client import LLMClient from letta.log import get_logger from letta.otel.tracing import trace_method from letta.prompts import gpt_summarize -from letta.schemas.enums import AgentType, MessageRole +from letta.schemas.enums import AgentType, MessageRole, ProviderType from letta.schemas.letta_message_content import TextContent from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message, MessageCreate @@ -457,9 +457,63 @@ async def simple_summary( summarizer_llm_config.put_inner_thoughts_in_kwargs = False summarizer_llm_config.enable_reasoner = False + async def _run_summarizer_request(req_data: dict, req_messages_obj: list[Message]) -> str: + """Run summarization request and return assistant text. + + For Anthropic, use provider-side streaming to avoid long-request failures + (Anthropic requires streaming for requests that may exceed ~10 minutes). + """ + + if summarizer_llm_config.model_endpoint_type in [ProviderType.anthropic, ProviderType.bedrock]: + logger.info( + "Summarizer: using provider streaming (%s/%s) to avoid long-request failures", + summarizer_llm_config.model_endpoint_type, + summarizer_llm_config.model, + ) + # Stream from provider and accumulate the final assistant text. + from letta.interfaces.anthropic_parallel_tool_call_streaming_interface import ( + SimpleAnthropicStreamingInterface, + ) + + interface = SimpleAnthropicStreamingInterface( + requires_approval_tools=[], + run_id=None, + step_id=None, + ) + + # AnthropicClient.stream_async sets request_data["stream"] = True internally. + stream = await llm_client.stream_async(req_data, summarizer_llm_config) + async for _chunk in interface.process(stream): + # We don't emit anything; we just want the fully-accumulated content. + pass + + content_parts = interface.get_content() + text = "".join(part.text for part in content_parts if isinstance(part, TextContent)).strip() + if not text: + logger.warning("No content returned from summarizer (streaming path)") + raise Exception("Summary failed to generate") + return text + + # Default: non-streaming provider request, then normalize via chat-completions conversion. + logger.debug( + "Summarizer: using non-streaming request (%s/%s)", + summarizer_llm_config.model_endpoint_type, + summarizer_llm_config.model, + ) + response_data = await llm_client.request_async(req_data, summarizer_llm_config) + response = await llm_client.convert_response_to_chat_completion( + response_data, + req_messages_obj, + summarizer_llm_config, + ) + if response.choices[0].message.content is None: + logger.warning("No content returned from summarizer") + raise Exception("Summary failed to generate") + return response.choices[0].message.content.strip() + request_data = llm_client.build_request_data(AgentType.letta_v1_agent, input_messages_obj, summarizer_llm_config, tools=[]) try: - response_data = await llm_client.request_async(request_data, summarizer_llm_config) + summary = await _run_summarizer_request(request_data, input_messages_obj) except Exception as e: # handle LLM error (likely a context window exceeded error) try: @@ -497,7 +551,7 @@ async def simple_summary( ) try: - response_data = await llm_client.request_async(request_data, summarizer_llm_config) + summary = await _run_summarizer_request(request_data, input_messages_obj) except Exception as fallback_error_a: # Fallback B: hard-truncate the user transcript to fit a conservative char budget logger.warning(f"Clamped tool returns still overflowed ({fallback_error_a}). Falling back to transcript truncation.") @@ -534,21 +588,12 @@ async def simple_summary( tools=[], ) try: - response_data = await llm_client.request_async(request_data, summarizer_llm_config) + summary = await _run_summarizer_request(request_data, input_messages_obj) except Exception as fallback_error_b: logger.error(f"Transcript truncation fallback also failed: {fallback_error_b}. Propagating error.") logger.info(f"Full fallback summarization payload: {request_data}") raise llm_client.handle_llm_error(fallback_error_b) - response = await llm_client.convert_response_to_chat_completion(response_data, input_messages_obj, summarizer_llm_config) - if response.choices[0].message.content is None: - logger.warning("No content returned from summarizer") - # TODO raise an error error instead? - # return "[Summary failed to generate]" - raise Exception("Summary failed to generate") - else: - summary = response.choices[0].message.content.strip() - return summary diff --git a/tests/integration_test_summarizer.py b/tests/integration_test_summarizer.py index 8736b8e7..c2a7441f 100644 --- a/tests/integration_test_summarizer.py +++ b/tests/integration_test_summarizer.py @@ -27,6 +27,8 @@ from letta.schemas.message import Message as PydanticMessage, MessageCreate from letta.schemas.run import Run as PydanticRun from letta.server.server import SyncServer from letta.services.run_manager import RunManager +from letta.services.summarizer.summarizer import simple_summary +from letta.settings import model_settings # Constants DEFAULT_EMBEDDING_CONFIG = EmbeddingConfig.default_config(provider="openai") @@ -240,6 +242,49 @@ async def test_summarize_empty_message_buffer(server: SyncServer, actor, llm_con assert "No assistant message found" in str(e) or "empty" in str(e).lower() +@pytest.mark.asyncio +@pytest.mark.skipif( + not model_settings.anthropic_api_key, + reason="Missing LETTA_ANTHROPIC_API_KEY (or equivalent settings) for Anthropic integration test", +) +async def test_simple_summary_anthropic_uses_streaming_and_returns_summary(actor, monkeypatch): + """Regression test: Anthropic summarization must use streaming and return real text.""" + + # If the summarizer ever falls back to a non-streaming Anthropic call, make it fail fast. + from letta.llm_api.anthropic_client import AnthropicClient + + async def _nope_request_async(self, *args, **kwargs): + raise AssertionError("Anthropic summarizer should not call request_async (must use streaming)") + + monkeypatch.setattr(AnthropicClient, "request_async", _nope_request_async) + + # Keep the prompt tiny so this is fast and cheap. + messages = [ + PydanticMessage( + role=MessageRole.user, + content=[TextContent(type="text", text="I'm planning a trip to Paris in April.")], + ), + PydanticMessage( + role=MessageRole.assistant, + content=[ + TextContent( + type="text", + text="Great—your priorities are museums and cafes, and you want to stay under $200/day.", + ) + ], + ), + ] + + anthropic_config = get_llm_config("claude-4-5-haiku.json") + + summary = await simple_summary(messages=messages, llm_config=anthropic_config, actor=actor) + + assert isinstance(summary, str) + assert len(summary) > 10 + # Sanity-check that the model is summarizing the right conversation. + assert any(token in summary.lower() for token in ["paris", "april", "museum", "cafe", "$200", "200"]) + + @pytest.mark.asyncio @pytest.mark.parametrize( "llm_config",