feat: use token streaming for anthropic summarization (#7105)
This commit is contained in:
committed by
Caren Thomas
parent
857139f907
commit
f1bd246e9b
@@ -16,7 +16,7 @@ from letta.llm_api.llm_client import LLMClient
|
||||
from letta.log import get_logger
|
||||
from letta.otel.tracing import trace_method
|
||||
from letta.prompts import gpt_summarize
|
||||
from letta.schemas.enums import AgentType, MessageRole
|
||||
from letta.schemas.enums import AgentType, MessageRole, ProviderType
|
||||
from letta.schemas.letta_message_content import TextContent
|
||||
from letta.schemas.llm_config import LLMConfig
|
||||
from letta.schemas.message import Message, MessageCreate
|
||||
@@ -457,9 +457,63 @@ async def simple_summary(
|
||||
summarizer_llm_config.put_inner_thoughts_in_kwargs = False
|
||||
summarizer_llm_config.enable_reasoner = False
|
||||
|
||||
async def _run_summarizer_request(req_data: dict, req_messages_obj: list[Message]) -> str:
|
||||
"""Run summarization request and return assistant text.
|
||||
|
||||
For Anthropic, use provider-side streaming to avoid long-request failures
|
||||
(Anthropic requires streaming for requests that may exceed ~10 minutes).
|
||||
"""
|
||||
|
||||
if summarizer_llm_config.model_endpoint_type in [ProviderType.anthropic, ProviderType.bedrock]:
|
||||
logger.info(
|
||||
"Summarizer: using provider streaming (%s/%s) to avoid long-request failures",
|
||||
summarizer_llm_config.model_endpoint_type,
|
||||
summarizer_llm_config.model,
|
||||
)
|
||||
# Stream from provider and accumulate the final assistant text.
|
||||
from letta.interfaces.anthropic_parallel_tool_call_streaming_interface import (
|
||||
SimpleAnthropicStreamingInterface,
|
||||
)
|
||||
|
||||
interface = SimpleAnthropicStreamingInterface(
|
||||
requires_approval_tools=[],
|
||||
run_id=None,
|
||||
step_id=None,
|
||||
)
|
||||
|
||||
# AnthropicClient.stream_async sets request_data["stream"] = True internally.
|
||||
stream = await llm_client.stream_async(req_data, summarizer_llm_config)
|
||||
async for _chunk in interface.process(stream):
|
||||
# We don't emit anything; we just want the fully-accumulated content.
|
||||
pass
|
||||
|
||||
content_parts = interface.get_content()
|
||||
text = "".join(part.text for part in content_parts if isinstance(part, TextContent)).strip()
|
||||
if not text:
|
||||
logger.warning("No content returned from summarizer (streaming path)")
|
||||
raise Exception("Summary failed to generate")
|
||||
return text
|
||||
|
||||
# Default: non-streaming provider request, then normalize via chat-completions conversion.
|
||||
logger.debug(
|
||||
"Summarizer: using non-streaming request (%s/%s)",
|
||||
summarizer_llm_config.model_endpoint_type,
|
||||
summarizer_llm_config.model,
|
||||
)
|
||||
response_data = await llm_client.request_async(req_data, summarizer_llm_config)
|
||||
response = await llm_client.convert_response_to_chat_completion(
|
||||
response_data,
|
||||
req_messages_obj,
|
||||
summarizer_llm_config,
|
||||
)
|
||||
if response.choices[0].message.content is None:
|
||||
logger.warning("No content returned from summarizer")
|
||||
raise Exception("Summary failed to generate")
|
||||
return response.choices[0].message.content.strip()
|
||||
|
||||
request_data = llm_client.build_request_data(AgentType.letta_v1_agent, input_messages_obj, summarizer_llm_config, tools=[])
|
||||
try:
|
||||
response_data = await llm_client.request_async(request_data, summarizer_llm_config)
|
||||
summary = await _run_summarizer_request(request_data, input_messages_obj)
|
||||
except Exception as e:
|
||||
# handle LLM error (likely a context window exceeded error)
|
||||
try:
|
||||
@@ -497,7 +551,7 @@ async def simple_summary(
|
||||
)
|
||||
|
||||
try:
|
||||
response_data = await llm_client.request_async(request_data, summarizer_llm_config)
|
||||
summary = await _run_summarizer_request(request_data, input_messages_obj)
|
||||
except Exception as fallback_error_a:
|
||||
# Fallback B: hard-truncate the user transcript to fit a conservative char budget
|
||||
logger.warning(f"Clamped tool returns still overflowed ({fallback_error_a}). Falling back to transcript truncation.")
|
||||
@@ -534,21 +588,12 @@ async def simple_summary(
|
||||
tools=[],
|
||||
)
|
||||
try:
|
||||
response_data = await llm_client.request_async(request_data, summarizer_llm_config)
|
||||
summary = await _run_summarizer_request(request_data, input_messages_obj)
|
||||
except Exception as fallback_error_b:
|
||||
logger.error(f"Transcript truncation fallback also failed: {fallback_error_b}. Propagating error.")
|
||||
logger.info(f"Full fallback summarization payload: {request_data}")
|
||||
raise llm_client.handle_llm_error(fallback_error_b)
|
||||
|
||||
response = await llm_client.convert_response_to_chat_completion(response_data, input_messages_obj, summarizer_llm_config)
|
||||
if response.choices[0].message.content is None:
|
||||
logger.warning("No content returned from summarizer")
|
||||
# TODO raise an error error instead?
|
||||
# return "[Summary failed to generate]"
|
||||
raise Exception("Summary failed to generate")
|
||||
else:
|
||||
summary = response.choices[0].message.content.strip()
|
||||
|
||||
return summary
|
||||
|
||||
|
||||
|
||||
@@ -27,6 +27,8 @@ from letta.schemas.message import Message as PydanticMessage, MessageCreate
|
||||
from letta.schemas.run import Run as PydanticRun
|
||||
from letta.server.server import SyncServer
|
||||
from letta.services.run_manager import RunManager
|
||||
from letta.services.summarizer.summarizer import simple_summary
|
||||
from letta.settings import model_settings
|
||||
|
||||
# Constants
|
||||
DEFAULT_EMBEDDING_CONFIG = EmbeddingConfig.default_config(provider="openai")
|
||||
@@ -240,6 +242,49 @@ async def test_summarize_empty_message_buffer(server: SyncServer, actor, llm_con
|
||||
assert "No assistant message found" in str(e) or "empty" in str(e).lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(
|
||||
not model_settings.anthropic_api_key,
|
||||
reason="Missing LETTA_ANTHROPIC_API_KEY (or equivalent settings) for Anthropic integration test",
|
||||
)
|
||||
async def test_simple_summary_anthropic_uses_streaming_and_returns_summary(actor, monkeypatch):
|
||||
"""Regression test: Anthropic summarization must use streaming and return real text."""
|
||||
|
||||
# If the summarizer ever falls back to a non-streaming Anthropic call, make it fail fast.
|
||||
from letta.llm_api.anthropic_client import AnthropicClient
|
||||
|
||||
async def _nope_request_async(self, *args, **kwargs):
|
||||
raise AssertionError("Anthropic summarizer should not call request_async (must use streaming)")
|
||||
|
||||
monkeypatch.setattr(AnthropicClient, "request_async", _nope_request_async)
|
||||
|
||||
# Keep the prompt tiny so this is fast and cheap.
|
||||
messages = [
|
||||
PydanticMessage(
|
||||
role=MessageRole.user,
|
||||
content=[TextContent(type="text", text="I'm planning a trip to Paris in April.")],
|
||||
),
|
||||
PydanticMessage(
|
||||
role=MessageRole.assistant,
|
||||
content=[
|
||||
TextContent(
|
||||
type="text",
|
||||
text="Great—your priorities are museums and cafes, and you want to stay under $200/day.",
|
||||
)
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
anthropic_config = get_llm_config("claude-4-5-haiku.json")
|
||||
|
||||
summary = await simple_summary(messages=messages, llm_config=anthropic_config, actor=actor)
|
||||
|
||||
assert isinstance(summary, str)
|
||||
assert len(summary) > 10
|
||||
# Sanity-check that the model is summarizing the right conversation.
|
||||
assert any(token in summary.lower() for token in ["paris", "april", "museum", "cafe", "$200", "200"])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"llm_config",
|
||||
|
||||
Reference in New Issue
Block a user