feat: move new streaming adapters into own files (#5001)

This commit is contained in:
cthomas
2025-09-29 20:24:44 -07:00
committed by Caren Thomas
parent 5502134e7a
commit 76d1bc8cbc
5 changed files with 271 additions and 257 deletions

View File

@@ -1,4 +1,3 @@
import asyncio
from typing import AsyncGenerator
from letta.adapters.letta_llm_adapter import LettaLLMAdapter
@@ -110,81 +109,3 @@ class LettaLLMRequestAdapter(LettaLLMAdapter):
),
label="create_provider_trace",
)
class SimpleLettaLLMRequestAdapter(LettaLLMRequestAdapter):
"""Simplifying assumptions:
- No inner thoughts in kwargs
- No forced tool calls
- Content native as assistant message
"""
async def invoke_llm(
self,
request_data: dict,
messages: list,
tools: list,
use_assistant_message: bool,
requires_approval_tools: list[str] = [],
step_id: str | None = None,
actor: str | None = None,
) -> AsyncGenerator[LettaMessage | None, None]:
"""
Execute a blocking LLM request and yield the response.
This adapter:
1. Makes a blocking request to the LLM
2. Converts the response to chat completion format
3. Extracts reasoning and tool call information
4. Updates all instance variables
5. Yields nothing (blocking mode doesn't stream)
"""
# Store request data
self.request_data = request_data
# Make the blocking LLM request
self.response_data = await self.llm_client.request_async(request_data, self.llm_config)
self.llm_request_finish_timestamp_ns = get_utc_timestamp_ns()
# Convert response to chat completion format
self.chat_completions_response = self.llm_client.convert_response_to_chat_completion(self.response_data, messages, self.llm_config)
# Extract reasoning content from the response
if self.chat_completions_response.choices[0].message.reasoning_content:
self.reasoning_content = [
ReasoningContent(
reasoning=self.chat_completions_response.choices[0].message.reasoning_content,
is_native=True,
signature=self.chat_completions_response.choices[0].message.reasoning_content_signature,
)
]
elif self.chat_completions_response.choices[0].message.omitted_reasoning_content:
self.reasoning_content = [OmittedReasoningContent()]
else:
# logger.info("No reasoning content found.")
self.reasoning_content = None
if self.chat_completions_response.choices[0].message.content:
# NOTE: big difference - 'content' goes into 'content'
# Reasoning placed into content for legacy reasons
self.content = [TextContent(text=self.chat_completions_response.choices[0].message.content)]
else:
self.content = None
# Extract tool call
if self.chat_completions_response.choices[0].message.tool_calls:
self.tool_call = self.chat_completions_response.choices[0].message.tool_calls[0]
else:
self.tool_call = None
# Extract usage statistics
self.usage.step_count = 1
self.usage.completion_tokens = self.chat_completions_response.usage.completion_tokens
self.usage.prompt_tokens = self.chat_completions_response.usage.prompt_tokens
self.usage.total_tokens = self.chat_completions_response.usage.total_tokens
self.log_provider_trace(step_id=step_id, actor=actor)
yield None
return

View File

@@ -1,18 +1,12 @@
import asyncio
from typing import AsyncGenerator, List
from typing import AsyncGenerator
from letta.adapters.letta_llm_adapter import LettaLLMAdapter
from letta.helpers.datetime_helpers import get_utc_timestamp_ns
from letta.interfaces.anthropic_streaming_interface import AnthropicStreamingInterface, SimpleAnthropicStreamingInterface
from letta.interfaces.openai_streaming_interface import (
OpenAIStreamingInterface,
SimpleOpenAIResponsesStreamingInterface,
SimpleOpenAIStreamingInterface,
)
from letta.interfaces.anthropic_streaming_interface import AnthropicStreamingInterface
from letta.interfaces.openai_streaming_interface import OpenAIStreamingInterface
from letta.llm_api.llm_client_base import LLMClientBase
from letta.schemas.enums import ProviderType
from letta.schemas.letta_message import LettaMessage
from letta.schemas.letta_message_content import SummarizedReasoningContent, TextContent
from letta.schemas.llm_config import LLMConfig
from letta.schemas.provider_trace import ProviderTraceCreate
from letta.schemas.usage import LettaUsageStatistics
@@ -174,168 +168,3 @@ class LettaLLMStreamAdapter(LettaLLMAdapter):
),
label="create_provider_trace",
)
class SimpleLettaLLMStreamAdapter(LettaLLMStreamAdapter):
"""
Adapter for handling streaming LLM requests with immediate token yielding.
This adapter supports real-time streaming of tokens from the LLM, providing
minimal time-to-first-token (TTFT) latency. It uses specialized streaming
interfaces for different providers (OpenAI, Anthropic) to handle their
specific streaming formats.
"""
async def invoke_llm(
self,
request_data: dict,
messages: list,
tools: list,
use_assistant_message: bool, # NOTE: not used
requires_approval_tools: list[str] = [],
step_id: str | None = None,
actor: User | None = None,
) -> AsyncGenerator[LettaMessage, None]:
"""
Execute a streaming LLM request and yield tokens/chunks as they arrive.
This adapter:
1. Makes a streaming request to the LLM
2. Yields chunks immediately for minimal TTFT
3. Accumulates response data through the streaming interface
4. Updates all instance variables after streaming completes
"""
# Store request data
self.request_data = request_data
# Instantiate streaming interface
if self.llm_config.model_endpoint_type in [ProviderType.anthropic, ProviderType.bedrock]:
# NOTE: different
self.interface = SimpleAnthropicStreamingInterface(
requires_approval_tools=requires_approval_tools,
)
elif self.llm_config.model_endpoint_type == ProviderType.openai:
# Decide interface based on payload shape
use_responses = "input" in request_data and "messages" not in request_data
# No support for Responses API proxy
is_proxy = self.llm_config.provider_name == "lmstudio_openai"
if use_responses and not is_proxy:
self.interface = SimpleOpenAIResponsesStreamingInterface(
is_openai_proxy=False,
messages=messages,
tools=tools,
requires_approval_tools=requires_approval_tools,
)
else:
self.interface = SimpleOpenAIStreamingInterface(
is_openai_proxy=self.llm_config.provider_name == "lmstudio_openai",
messages=messages,
tools=tools,
requires_approval_tools=requires_approval_tools,
model=self.llm_config.model,
)
else:
raise ValueError(f"Streaming not supported for provider {self.llm_config.model_endpoint_type}")
# Extract optional parameters
# ttft_span = kwargs.get('ttft_span', None)
# Start the streaming request
stream = await self.llm_client.stream_async(request_data, self.llm_config)
# Process the stream and yield chunks immediately for TTFT
async for chunk in self.interface.process(stream): # TODO: add ttft span
# Yield each chunk immediately as it arrives
yield chunk
# After streaming completes, extract the accumulated data
self.llm_request_finish_timestamp_ns = get_utc_timestamp_ns()
# Extract tool call from the interface
try:
self.tool_call = self.interface.get_tool_call_object()
except ValueError as e:
# No tool call, handle upstream
self.tool_call = None
# Extract reasoning content from the interface
# TODO this should probably just be called "content"?
# self.reasoning_content = self.interface.get_reasoning_content()
# Extract non-reasoning content (eg text)
self.content: List[TextContent | SummarizedReasoningContent] = self.interface.get_content()
# Extract usage statistics
# Some providers don't provide usage in streaming, use fallback if needed
if hasattr(self.interface, "input_tokens") and hasattr(self.interface, "output_tokens"):
# Handle cases where tokens might not be set (e.g., LMStudio)
input_tokens = self.interface.input_tokens
output_tokens = self.interface.output_tokens
# Fallback to estimated values if not provided
if not input_tokens and hasattr(self.interface, "fallback_input_tokens"):
input_tokens = self.interface.fallback_input_tokens
if not output_tokens and hasattr(self.interface, "fallback_output_tokens"):
output_tokens = self.interface.fallback_output_tokens
self.usage = LettaUsageStatistics(
step_count=1,
completion_tokens=output_tokens or 0,
prompt_tokens=input_tokens or 0,
total_tokens=(input_tokens or 0) + (output_tokens or 0),
)
else:
# Default usage statistics if not available
self.usage = LettaUsageStatistics(step_count=1, completion_tokens=0, prompt_tokens=0, total_tokens=0)
# Store any additional data from the interface
self.message_id = self.interface.letta_message_id
# Log request and response data
self.log_provider_trace(step_id=step_id, actor=actor)
def log_provider_trace(self, step_id: str | None, actor: User | None) -> None:
"""
Log provider trace data for telemetry purposes in a fire-and-forget manner.
Creates an async task to log the request/response data without blocking
the main execution flow. For streaming adapters, this includes the final
tool call and reasoning content collected during streaming.
Args:
step_id: The step ID associated with this request for logging purposes
actor: The user associated with this request for logging purposes
"""
if step_id is None or actor is None or not settings.track_provider_trace:
return
safe_create_task(
self.telemetry_manager.create_provider_trace_async(
actor=actor,
provider_trace_create=ProviderTraceCreate(
request_json=self.request_data,
response_json={
"content": {
"tool_call": self.tool_call.model_dump_json() if self.tool_call else None,
# "reasoning": [content.model_dump_json() for content in self.reasoning_content],
# NOTE: different
# TODO potentially split this into both content and reasoning?
"content": [content.model_dump_json() for content in self.content],
},
"id": self.interface.message_id,
"model": self.interface.model,
"role": "assistant",
# "stop_reason": "",
# "stop_sequence": None,
"type": "message",
"usage": {
"input_tokens": self.usage.prompt_tokens,
"output_tokens": self.usage.completion_tokens,
},
},
step_id=step_id, # Use original step_id for telemetry
organization_id=actor.organization_id,
),
),
label="create_provider_trace",
)

View File

@@ -0,0 +1,84 @@
from typing import AsyncGenerator
from letta.adapters.letta_llm_request_adapter import LettaLLMRequestAdapter
from letta.helpers.datetime_helpers import get_utc_timestamp_ns
from letta.schemas.letta_message import LettaMessage
from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, TextContent
class SimpleLLMRequestAdapter(LettaLLMRequestAdapter):
"""Simplifying assumptions:
- No inner thoughts in kwargs
- No forced tool calls
- Content native as assistant message
"""
async def invoke_llm(
self,
request_data: dict,
messages: list,
tools: list,
use_assistant_message: bool,
requires_approval_tools: list[str] = [],
step_id: str | None = None,
actor: str | None = None,
) -> AsyncGenerator[LettaMessage | None, None]:
"""
Execute a blocking LLM request and yield the response.
This adapter:
1. Makes a blocking request to the LLM
2. Converts the response to chat completion format
3. Extracts reasoning and tool call information
4. Updates all instance variables
5. Yields nothing (blocking mode doesn't stream)
"""
# Store request data
self.request_data = request_data
# Make the blocking LLM request
self.response_data = await self.llm_client.request_async(request_data, self.llm_config)
self.llm_request_finish_timestamp_ns = get_utc_timestamp_ns()
# Convert response to chat completion format
self.chat_completions_response = self.llm_client.convert_response_to_chat_completion(self.response_data, messages, self.llm_config)
# Extract reasoning content from the response
if self.chat_completions_response.choices[0].message.reasoning_content:
self.reasoning_content = [
ReasoningContent(
reasoning=self.chat_completions_response.choices[0].message.reasoning_content,
is_native=True,
signature=self.chat_completions_response.choices[0].message.reasoning_content_signature,
)
]
elif self.chat_completions_response.choices[0].message.omitted_reasoning_content:
self.reasoning_content = [OmittedReasoningContent()]
else:
# logger.info("No reasoning content found.")
self.reasoning_content = None
if self.chat_completions_response.choices[0].message.content:
# NOTE: big difference - 'content' goes into 'content'
# Reasoning placed into content for legacy reasons
self.content = [TextContent(text=self.chat_completions_response.choices[0].message.content)]
else:
self.content = None
# Extract tool call
if self.chat_completions_response.choices[0].message.tool_calls:
self.tool_call = self.chat_completions_response.choices[0].message.tool_calls[0]
else:
self.tool_call = None
# Extract usage statistics
self.usage.step_count = 1
self.usage.completion_tokens = self.chat_completions_response.usage.completion_tokens
self.usage.prompt_tokens = self.chat_completions_response.usage.prompt_tokens
self.usage.total_tokens = self.chat_completions_response.usage.total_tokens
self.log_provider_trace(step_id=step_id, actor=actor)
yield None
return

View File

@@ -0,0 +1,179 @@
from typing import AsyncGenerator, List
from letta.adapters.letta_llm_stream_adapter import LettaLLMStreamAdapter
from letta.helpers.datetime_helpers import get_utc_timestamp_ns
from letta.interfaces.anthropic_streaming_interface import SimpleAnthropicStreamingInterface
from letta.interfaces.openai_streaming_interface import SimpleOpenAIStreamingInterface
from letta.schemas.enums import ProviderType
from letta.schemas.letta_message import LettaMessage
from letta.schemas.letta_message_content import SummarizedReasoningContent, TextContent
from letta.schemas.provider_trace import ProviderTraceCreate
from letta.schemas.usage import LettaUsageStatistics
from letta.schemas.user import User
from letta.settings import settings
from letta.utils import safe_create_task
class SimpleLLMStreamAdapter(LettaLLMStreamAdapter):
"""
Adapter for handling streaming LLM requests with immediate token yielding.
This adapter supports real-time streaming of tokens from the LLM, providing
minimal time-to-first-token (TTFT) latency. It uses specialized streaming
interfaces for different providers (OpenAI, Anthropic) to handle their
specific streaming formats.
"""
async def invoke_llm(
self,
request_data: dict,
messages: list,
tools: list,
use_assistant_message: bool, # NOTE: not used
requires_approval_tools: list[str] = [],
step_id: str | None = None,
actor: User | None = None,
) -> AsyncGenerator[LettaMessage, None]:
"""
Execute a streaming LLM request and yield tokens/chunks as they arrive.
This adapter:
1. Makes a streaming request to the LLM
2. Yields chunks immediately for minimal TTFT
3. Accumulates response data through the streaming interface
4. Updates all instance variables after streaming completes
"""
# Store request data
self.request_data = request_data
# Instantiate streaming interface
if self.llm_config.model_endpoint_type in [ProviderType.anthropic, ProviderType.bedrock]:
# NOTE: different
self.interface = SimpleAnthropicStreamingInterface(
requires_approval_tools=requires_approval_tools,
)
elif self.llm_config.model_endpoint_type == ProviderType.openai:
# Decide interface based on payload shape
use_responses = "input" in request_data and "messages" not in request_data
# No support for Responses API proxy
is_proxy = self.llm_config.provider_name == "lmstudio_openai"
if use_responses and not is_proxy:
self.interface = SimpleOpenAIResponsesStreamingInterface(
is_openai_proxy=False,
messages=messages,
tools=tools,
requires_approval_tools=requires_approval_tools,
)
else:
self.interface = SimpleOpenAIStreamingInterface(
is_openai_proxy=self.llm_config.provider_name == "lmstudio_openai",
messages=messages,
tools=tools,
requires_approval_tools=requires_approval_tools,
model=self.llm_config.model,
)
else:
raise ValueError(f"Streaming not supported for provider {self.llm_config.model_endpoint_type}")
# Extract optional parameters
# ttft_span = kwargs.get('ttft_span', None)
# Start the streaming request
stream = await self.llm_client.stream_async(request_data, self.llm_config)
# Process the stream and yield chunks immediately for TTFT
async for chunk in self.interface.process(stream): # TODO: add ttft span
# Yield each chunk immediately as it arrives
yield chunk
# After streaming completes, extract the accumulated data
self.llm_request_finish_timestamp_ns = get_utc_timestamp_ns()
# Extract tool call from the interface
try:
self.tool_call = self.interface.get_tool_call_object()
except ValueError as e:
# No tool call, handle upstream
self.tool_call = None
# Extract reasoning content from the interface
# TODO this should probably just be called "content"?
# self.reasoning_content = self.interface.get_reasoning_content()
# Extract non-reasoning content (eg text)
self.content: List[TextContent | SummarizedReasoningContent] = self.interface.get_content()
# Extract usage statistics
# Some providers don't provide usage in streaming, use fallback if needed
if hasattr(self.interface, "input_tokens") and hasattr(self.interface, "output_tokens"):
# Handle cases where tokens might not be set (e.g., LMStudio)
input_tokens = self.interface.input_tokens
output_tokens = self.interface.output_tokens
# Fallback to estimated values if not provided
if not input_tokens and hasattr(self.interface, "fallback_input_tokens"):
input_tokens = self.interface.fallback_input_tokens
if not output_tokens and hasattr(self.interface, "fallback_output_tokens"):
output_tokens = self.interface.fallback_output_tokens
self.usage = LettaUsageStatistics(
step_count=1,
completion_tokens=output_tokens or 0,
prompt_tokens=input_tokens or 0,
total_tokens=(input_tokens or 0) + (output_tokens or 0),
)
else:
# Default usage statistics if not available
self.usage = LettaUsageStatistics(step_count=1, completion_tokens=0, prompt_tokens=0, total_tokens=0)
# Store any additional data from the interface
self.message_id = self.interface.letta_message_id
# Log request and response data
self.log_provider_trace(step_id=step_id, actor=actor)
def log_provider_trace(self, step_id: str | None, actor: User | None) -> None:
"""
Log provider trace data for telemetry purposes in a fire-and-forget manner.
Creates an async task to log the request/response data without blocking
the main execution flow. For streaming adapters, this includes the final
tool call and reasoning content collected during streaming.
Args:
step_id: The step ID associated with this request for logging purposes
actor: The user associated with this request for logging purposes
"""
if step_id is None or actor is None or not settings.track_provider_trace:
return
safe_create_task(
self.telemetry_manager.create_provider_trace_async(
actor=actor,
provider_trace_create=ProviderTraceCreate(
request_json=self.request_data,
response_json={
"content": {
"tool_call": self.tool_call.model_dump_json() if self.tool_call else None,
# "reasoning": [content.model_dump_json() for content in self.reasoning_content],
# NOTE: different
# TODO potentially split this into both content and reasoning?
"content": [content.model_dump_json() for content in self.content],
},
"id": self.interface.message_id,
"model": self.interface.model,
"role": "assistant",
# "stop_reason": "",
# "stop_sequence": None,
"type": "message",
"usage": {
"input_tokens": self.usage.prompt_tokens,
"output_tokens": self.usage.completion_tokens,
},
},
step_id=step_id, # Use original step_id for telemetry
organization_id=actor.organization_id,
),
),
label="create_provider_trace",
)

View File

@@ -4,8 +4,9 @@ from typing import AsyncGenerator, Optional
from opentelemetry.trace import Span
from letta.adapters.letta_llm_adapter import LettaLLMAdapter
from letta.adapters.letta_llm_request_adapter import LettaLLMRequestAdapter, SimpleLettaLLMRequestAdapter
from letta.adapters.letta_llm_stream_adapter import SimpleLettaLLMStreamAdapter
from letta.adapters.letta_llm_request_adapter import LettaLLMRequestAdapter
from letta.adapters.simple_llm_request_adapter import SimpleLLMRequestAdapter
from letta.adapters.simple_llm_stream_adapter import SimpleLLMStreamAdapter
from letta.agents.helpers import (
_build_rule_violation_result,
_load_last_function_response,
@@ -162,12 +163,12 @@ class LettaAgentV3(LettaAgentV2):
first_chunk = True
if stream_tokens:
llm_adapter = SimpleLettaLLMStreamAdapter(
llm_adapter = SimpleLLMStreamAdapter(
llm_client=self.llm_client,
llm_config=self.agent_state.llm_config,
)
else:
llm_adapter = SimpleLettaLLMRequestAdapter(
llm_adapter = SimpleLLMRequestAdapter(
llm_client=self.llm_client,
llm_config=self.agent_state.llm_config,
)