From 76d1bc8cbcd38e376477b306c5d2dbc944e548e4 Mon Sep 17 00:00:00 2001 From: cthomas Date: Mon, 29 Sep 2025 20:24:44 -0700 Subject: [PATCH] feat: move new streaming adapters into own files (#5001) --- letta/adapters/letta_llm_request_adapter.py | 79 -------- letta/adapters/letta_llm_stream_adapter.py | 177 +----------------- letta/adapters/simple_llm_request_adapter.py | 84 +++++++++ letta/adapters/simple_llm_stream_adapter.py | 179 +++++++++++++++++++ letta/agents/letta_agent_v3.py | 9 +- 5 files changed, 271 insertions(+), 257 deletions(-) create mode 100644 letta/adapters/simple_llm_request_adapter.py create mode 100644 letta/adapters/simple_llm_stream_adapter.py diff --git a/letta/adapters/letta_llm_request_adapter.py b/letta/adapters/letta_llm_request_adapter.py index 07519e8a..e2166cec 100644 --- a/letta/adapters/letta_llm_request_adapter.py +++ b/letta/adapters/letta_llm_request_adapter.py @@ -1,4 +1,3 @@ -import asyncio from typing import AsyncGenerator from letta.adapters.letta_llm_adapter import LettaLLMAdapter @@ -110,81 +109,3 @@ class LettaLLMRequestAdapter(LettaLLMAdapter): ), label="create_provider_trace", ) - - -class SimpleLettaLLMRequestAdapter(LettaLLMRequestAdapter): - """Simplifying assumptions: - - - No inner thoughts in kwargs - - No forced tool calls - - Content native as assistant message - """ - - async def invoke_llm( - self, - request_data: dict, - messages: list, - tools: list, - use_assistant_message: bool, - requires_approval_tools: list[str] = [], - step_id: str | None = None, - actor: str | None = None, - ) -> AsyncGenerator[LettaMessage | None, None]: - """ - Execute a blocking LLM request and yield the response. - - This adapter: - 1. Makes a blocking request to the LLM - 2. Converts the response to chat completion format - 3. Extracts reasoning and tool call information - 4. Updates all instance variables - 5. Yields nothing (blocking mode doesn't stream) - """ - # Store request data - self.request_data = request_data - - # Make the blocking LLM request - self.response_data = await self.llm_client.request_async(request_data, self.llm_config) - self.llm_request_finish_timestamp_ns = get_utc_timestamp_ns() - - # Convert response to chat completion format - self.chat_completions_response = self.llm_client.convert_response_to_chat_completion(self.response_data, messages, self.llm_config) - - # Extract reasoning content from the response - if self.chat_completions_response.choices[0].message.reasoning_content: - self.reasoning_content = [ - ReasoningContent( - reasoning=self.chat_completions_response.choices[0].message.reasoning_content, - is_native=True, - signature=self.chat_completions_response.choices[0].message.reasoning_content_signature, - ) - ] - elif self.chat_completions_response.choices[0].message.omitted_reasoning_content: - self.reasoning_content = [OmittedReasoningContent()] - else: - # logger.info("No reasoning content found.") - self.reasoning_content = None - - if self.chat_completions_response.choices[0].message.content: - # NOTE: big difference - 'content' goes into 'content' - # Reasoning placed into content for legacy reasons - self.content = [TextContent(text=self.chat_completions_response.choices[0].message.content)] - else: - self.content = None - - # Extract tool call - if self.chat_completions_response.choices[0].message.tool_calls: - self.tool_call = self.chat_completions_response.choices[0].message.tool_calls[0] - else: - self.tool_call = None - - # Extract usage statistics - self.usage.step_count = 1 - self.usage.completion_tokens = self.chat_completions_response.usage.completion_tokens - self.usage.prompt_tokens = self.chat_completions_response.usage.prompt_tokens - self.usage.total_tokens = self.chat_completions_response.usage.total_tokens - - self.log_provider_trace(step_id=step_id, actor=actor) - - yield None - return diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index 84f337f9..9f17d5ae 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -1,18 +1,12 @@ -import asyncio -from typing import AsyncGenerator, List +from typing import AsyncGenerator from letta.adapters.letta_llm_adapter import LettaLLMAdapter from letta.helpers.datetime_helpers import get_utc_timestamp_ns -from letta.interfaces.anthropic_streaming_interface import AnthropicStreamingInterface, SimpleAnthropicStreamingInterface -from letta.interfaces.openai_streaming_interface import ( - OpenAIStreamingInterface, - SimpleOpenAIResponsesStreamingInterface, - SimpleOpenAIStreamingInterface, -) +from letta.interfaces.anthropic_streaming_interface import AnthropicStreamingInterface +from letta.interfaces.openai_streaming_interface import OpenAIStreamingInterface from letta.llm_api.llm_client_base import LLMClientBase from letta.schemas.enums import ProviderType from letta.schemas.letta_message import LettaMessage -from letta.schemas.letta_message_content import SummarizedReasoningContent, TextContent from letta.schemas.llm_config import LLMConfig from letta.schemas.provider_trace import ProviderTraceCreate from letta.schemas.usage import LettaUsageStatistics @@ -174,168 +168,3 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): ), label="create_provider_trace", ) - - -class SimpleLettaLLMStreamAdapter(LettaLLMStreamAdapter): - """ - Adapter for handling streaming LLM requests with immediate token yielding. - - This adapter supports real-time streaming of tokens from the LLM, providing - minimal time-to-first-token (TTFT) latency. It uses specialized streaming - interfaces for different providers (OpenAI, Anthropic) to handle their - specific streaming formats. - """ - - async def invoke_llm( - self, - request_data: dict, - messages: list, - tools: list, - use_assistant_message: bool, # NOTE: not used - requires_approval_tools: list[str] = [], - step_id: str | None = None, - actor: User | None = None, - ) -> AsyncGenerator[LettaMessage, None]: - """ - Execute a streaming LLM request and yield tokens/chunks as they arrive. - - This adapter: - 1. Makes a streaming request to the LLM - 2. Yields chunks immediately for minimal TTFT - 3. Accumulates response data through the streaming interface - 4. Updates all instance variables after streaming completes - """ - # Store request data - self.request_data = request_data - - # Instantiate streaming interface - if self.llm_config.model_endpoint_type in [ProviderType.anthropic, ProviderType.bedrock]: - # NOTE: different - self.interface = SimpleAnthropicStreamingInterface( - requires_approval_tools=requires_approval_tools, - ) - elif self.llm_config.model_endpoint_type == ProviderType.openai: - # Decide interface based on payload shape - use_responses = "input" in request_data and "messages" not in request_data - # No support for Responses API proxy - is_proxy = self.llm_config.provider_name == "lmstudio_openai" - if use_responses and not is_proxy: - self.interface = SimpleOpenAIResponsesStreamingInterface( - is_openai_proxy=False, - messages=messages, - tools=tools, - requires_approval_tools=requires_approval_tools, - ) - else: - self.interface = SimpleOpenAIStreamingInterface( - is_openai_proxy=self.llm_config.provider_name == "lmstudio_openai", - messages=messages, - tools=tools, - requires_approval_tools=requires_approval_tools, - model=self.llm_config.model, - ) - else: - raise ValueError(f"Streaming not supported for provider {self.llm_config.model_endpoint_type}") - - # Extract optional parameters - # ttft_span = kwargs.get('ttft_span', None) - - # Start the streaming request - stream = await self.llm_client.stream_async(request_data, self.llm_config) - - # Process the stream and yield chunks immediately for TTFT - async for chunk in self.interface.process(stream): # TODO: add ttft span - # Yield each chunk immediately as it arrives - yield chunk - - # After streaming completes, extract the accumulated data - self.llm_request_finish_timestamp_ns = get_utc_timestamp_ns() - - # Extract tool call from the interface - try: - self.tool_call = self.interface.get_tool_call_object() - except ValueError as e: - # No tool call, handle upstream - self.tool_call = None - - # Extract reasoning content from the interface - # TODO this should probably just be called "content"? - # self.reasoning_content = self.interface.get_reasoning_content() - - # Extract non-reasoning content (eg text) - self.content: List[TextContent | SummarizedReasoningContent] = self.interface.get_content() - - # Extract usage statistics - # Some providers don't provide usage in streaming, use fallback if needed - if hasattr(self.interface, "input_tokens") and hasattr(self.interface, "output_tokens"): - # Handle cases where tokens might not be set (e.g., LMStudio) - input_tokens = self.interface.input_tokens - output_tokens = self.interface.output_tokens - - # Fallback to estimated values if not provided - if not input_tokens and hasattr(self.interface, "fallback_input_tokens"): - input_tokens = self.interface.fallback_input_tokens - if not output_tokens and hasattr(self.interface, "fallback_output_tokens"): - output_tokens = self.interface.fallback_output_tokens - - self.usage = LettaUsageStatistics( - step_count=1, - completion_tokens=output_tokens or 0, - prompt_tokens=input_tokens or 0, - total_tokens=(input_tokens or 0) + (output_tokens or 0), - ) - else: - # Default usage statistics if not available - self.usage = LettaUsageStatistics(step_count=1, completion_tokens=0, prompt_tokens=0, total_tokens=0) - - # Store any additional data from the interface - self.message_id = self.interface.letta_message_id - - # Log request and response data - self.log_provider_trace(step_id=step_id, actor=actor) - - def log_provider_trace(self, step_id: str | None, actor: User | None) -> None: - """ - Log provider trace data for telemetry purposes in a fire-and-forget manner. - - Creates an async task to log the request/response data without blocking - the main execution flow. For streaming adapters, this includes the final - tool call and reasoning content collected during streaming. - - Args: - step_id: The step ID associated with this request for logging purposes - actor: The user associated with this request for logging purposes - """ - if step_id is None or actor is None or not settings.track_provider_trace: - return - - safe_create_task( - self.telemetry_manager.create_provider_trace_async( - actor=actor, - provider_trace_create=ProviderTraceCreate( - request_json=self.request_data, - response_json={ - "content": { - "tool_call": self.tool_call.model_dump_json() if self.tool_call else None, - # "reasoning": [content.model_dump_json() for content in self.reasoning_content], - # NOTE: different - # TODO potentially split this into both content and reasoning? - "content": [content.model_dump_json() for content in self.content], - }, - "id": self.interface.message_id, - "model": self.interface.model, - "role": "assistant", - # "stop_reason": "", - # "stop_sequence": None, - "type": "message", - "usage": { - "input_tokens": self.usage.prompt_tokens, - "output_tokens": self.usage.completion_tokens, - }, - }, - step_id=step_id, # Use original step_id for telemetry - organization_id=actor.organization_id, - ), - ), - label="create_provider_trace", - ) diff --git a/letta/adapters/simple_llm_request_adapter.py b/letta/adapters/simple_llm_request_adapter.py new file mode 100644 index 00000000..1e436527 --- /dev/null +++ b/letta/adapters/simple_llm_request_adapter.py @@ -0,0 +1,84 @@ +from typing import AsyncGenerator + +from letta.adapters.letta_llm_request_adapter import LettaLLMRequestAdapter +from letta.helpers.datetime_helpers import get_utc_timestamp_ns +from letta.schemas.letta_message import LettaMessage +from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, TextContent + + +class SimpleLLMRequestAdapter(LettaLLMRequestAdapter): + """Simplifying assumptions: + + - No inner thoughts in kwargs + - No forced tool calls + - Content native as assistant message + """ + + async def invoke_llm( + self, + request_data: dict, + messages: list, + tools: list, + use_assistant_message: bool, + requires_approval_tools: list[str] = [], + step_id: str | None = None, + actor: str | None = None, + ) -> AsyncGenerator[LettaMessage | None, None]: + """ + Execute a blocking LLM request and yield the response. + + This adapter: + 1. Makes a blocking request to the LLM + 2. Converts the response to chat completion format + 3. Extracts reasoning and tool call information + 4. Updates all instance variables + 5. Yields nothing (blocking mode doesn't stream) + """ + # Store request data + self.request_data = request_data + + # Make the blocking LLM request + self.response_data = await self.llm_client.request_async(request_data, self.llm_config) + self.llm_request_finish_timestamp_ns = get_utc_timestamp_ns() + + # Convert response to chat completion format + self.chat_completions_response = self.llm_client.convert_response_to_chat_completion(self.response_data, messages, self.llm_config) + + # Extract reasoning content from the response + if self.chat_completions_response.choices[0].message.reasoning_content: + self.reasoning_content = [ + ReasoningContent( + reasoning=self.chat_completions_response.choices[0].message.reasoning_content, + is_native=True, + signature=self.chat_completions_response.choices[0].message.reasoning_content_signature, + ) + ] + elif self.chat_completions_response.choices[0].message.omitted_reasoning_content: + self.reasoning_content = [OmittedReasoningContent()] + else: + # logger.info("No reasoning content found.") + self.reasoning_content = None + + if self.chat_completions_response.choices[0].message.content: + # NOTE: big difference - 'content' goes into 'content' + # Reasoning placed into content for legacy reasons + self.content = [TextContent(text=self.chat_completions_response.choices[0].message.content)] + else: + self.content = None + + # Extract tool call + if self.chat_completions_response.choices[0].message.tool_calls: + self.tool_call = self.chat_completions_response.choices[0].message.tool_calls[0] + else: + self.tool_call = None + + # Extract usage statistics + self.usage.step_count = 1 + self.usage.completion_tokens = self.chat_completions_response.usage.completion_tokens + self.usage.prompt_tokens = self.chat_completions_response.usage.prompt_tokens + self.usage.total_tokens = self.chat_completions_response.usage.total_tokens + + self.log_provider_trace(step_id=step_id, actor=actor) + + yield None + return diff --git a/letta/adapters/simple_llm_stream_adapter.py b/letta/adapters/simple_llm_stream_adapter.py new file mode 100644 index 00000000..0022d84a --- /dev/null +++ b/letta/adapters/simple_llm_stream_adapter.py @@ -0,0 +1,179 @@ +from typing import AsyncGenerator, List + +from letta.adapters.letta_llm_stream_adapter import LettaLLMStreamAdapter +from letta.helpers.datetime_helpers import get_utc_timestamp_ns +from letta.interfaces.anthropic_streaming_interface import SimpleAnthropicStreamingInterface +from letta.interfaces.openai_streaming_interface import SimpleOpenAIStreamingInterface +from letta.schemas.enums import ProviderType +from letta.schemas.letta_message import LettaMessage +from letta.schemas.letta_message_content import SummarizedReasoningContent, TextContent +from letta.schemas.provider_trace import ProviderTraceCreate +from letta.schemas.usage import LettaUsageStatistics +from letta.schemas.user import User +from letta.settings import settings +from letta.utils import safe_create_task + + +class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): + """ + Adapter for handling streaming LLM requests with immediate token yielding. + + This adapter supports real-time streaming of tokens from the LLM, providing + minimal time-to-first-token (TTFT) latency. It uses specialized streaming + interfaces for different providers (OpenAI, Anthropic) to handle their + specific streaming formats. + """ + + async def invoke_llm( + self, + request_data: dict, + messages: list, + tools: list, + use_assistant_message: bool, # NOTE: not used + requires_approval_tools: list[str] = [], + step_id: str | None = None, + actor: User | None = None, + ) -> AsyncGenerator[LettaMessage, None]: + """ + Execute a streaming LLM request and yield tokens/chunks as they arrive. + + This adapter: + 1. Makes a streaming request to the LLM + 2. Yields chunks immediately for minimal TTFT + 3. Accumulates response data through the streaming interface + 4. Updates all instance variables after streaming completes + """ + # Store request data + self.request_data = request_data + + # Instantiate streaming interface + if self.llm_config.model_endpoint_type in [ProviderType.anthropic, ProviderType.bedrock]: + # NOTE: different + self.interface = SimpleAnthropicStreamingInterface( + requires_approval_tools=requires_approval_tools, + ) + elif self.llm_config.model_endpoint_type == ProviderType.openai: + # Decide interface based on payload shape + use_responses = "input" in request_data and "messages" not in request_data + # No support for Responses API proxy + is_proxy = self.llm_config.provider_name == "lmstudio_openai" + if use_responses and not is_proxy: + self.interface = SimpleOpenAIResponsesStreamingInterface( + is_openai_proxy=False, + messages=messages, + tools=tools, + requires_approval_tools=requires_approval_tools, + ) + else: + self.interface = SimpleOpenAIStreamingInterface( + is_openai_proxy=self.llm_config.provider_name == "lmstudio_openai", + messages=messages, + tools=tools, + requires_approval_tools=requires_approval_tools, + model=self.llm_config.model, + ) + else: + raise ValueError(f"Streaming not supported for provider {self.llm_config.model_endpoint_type}") + + # Extract optional parameters + # ttft_span = kwargs.get('ttft_span', None) + + # Start the streaming request + stream = await self.llm_client.stream_async(request_data, self.llm_config) + + # Process the stream and yield chunks immediately for TTFT + async for chunk in self.interface.process(stream): # TODO: add ttft span + # Yield each chunk immediately as it arrives + yield chunk + + # After streaming completes, extract the accumulated data + self.llm_request_finish_timestamp_ns = get_utc_timestamp_ns() + + # Extract tool call from the interface + try: + self.tool_call = self.interface.get_tool_call_object() + except ValueError as e: + # No tool call, handle upstream + self.tool_call = None + + # Extract reasoning content from the interface + # TODO this should probably just be called "content"? + # self.reasoning_content = self.interface.get_reasoning_content() + + # Extract non-reasoning content (eg text) + self.content: List[TextContent | SummarizedReasoningContent] = self.interface.get_content() + + # Extract usage statistics + # Some providers don't provide usage in streaming, use fallback if needed + if hasattr(self.interface, "input_tokens") and hasattr(self.interface, "output_tokens"): + # Handle cases where tokens might not be set (e.g., LMStudio) + input_tokens = self.interface.input_tokens + output_tokens = self.interface.output_tokens + + # Fallback to estimated values if not provided + if not input_tokens and hasattr(self.interface, "fallback_input_tokens"): + input_tokens = self.interface.fallback_input_tokens + if not output_tokens and hasattr(self.interface, "fallback_output_tokens"): + output_tokens = self.interface.fallback_output_tokens + + self.usage = LettaUsageStatistics( + step_count=1, + completion_tokens=output_tokens or 0, + prompt_tokens=input_tokens or 0, + total_tokens=(input_tokens or 0) + (output_tokens or 0), + ) + else: + # Default usage statistics if not available + self.usage = LettaUsageStatistics(step_count=1, completion_tokens=0, prompt_tokens=0, total_tokens=0) + + # Store any additional data from the interface + self.message_id = self.interface.letta_message_id + + # Log request and response data + self.log_provider_trace(step_id=step_id, actor=actor) + + def log_provider_trace(self, step_id: str | None, actor: User | None) -> None: + """ + Log provider trace data for telemetry purposes in a fire-and-forget manner. + + Creates an async task to log the request/response data without blocking + the main execution flow. For streaming adapters, this includes the final + tool call and reasoning content collected during streaming. + + Args: + step_id: The step ID associated with this request for logging purposes + actor: The user associated with this request for logging purposes + """ + if step_id is None or actor is None or not settings.track_provider_trace: + return + + safe_create_task( + self.telemetry_manager.create_provider_trace_async( + actor=actor, + provider_trace_create=ProviderTraceCreate( + request_json=self.request_data, + response_json={ + "content": { + "tool_call": self.tool_call.model_dump_json() if self.tool_call else None, + # "reasoning": [content.model_dump_json() for content in self.reasoning_content], + # NOTE: different + # TODO potentially split this into both content and reasoning? + "content": [content.model_dump_json() for content in self.content], + }, + "id": self.interface.message_id, + "model": self.interface.model, + "role": "assistant", + # "stop_reason": "", + # "stop_sequence": None, + "type": "message", + "usage": { + "input_tokens": self.usage.prompt_tokens, + "output_tokens": self.usage.completion_tokens, + }, + }, + step_id=step_id, # Use original step_id for telemetry + organization_id=actor.organization_id, + ), + ), + label="create_provider_trace", + ) diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index 8aab1e98..064364ed 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -4,8 +4,9 @@ from typing import AsyncGenerator, Optional from opentelemetry.trace import Span from letta.adapters.letta_llm_adapter import LettaLLMAdapter -from letta.adapters.letta_llm_request_adapter import LettaLLMRequestAdapter, SimpleLettaLLMRequestAdapter -from letta.adapters.letta_llm_stream_adapter import SimpleLettaLLMStreamAdapter +from letta.adapters.letta_llm_request_adapter import LettaLLMRequestAdapter +from letta.adapters.simple_llm_request_adapter import SimpleLLMRequestAdapter +from letta.adapters.simple_llm_stream_adapter import SimpleLLMStreamAdapter from letta.agents.helpers import ( _build_rule_violation_result, _load_last_function_response, @@ -162,12 +163,12 @@ class LettaAgentV3(LettaAgentV2): first_chunk = True if stream_tokens: - llm_adapter = SimpleLettaLLMStreamAdapter( + llm_adapter = SimpleLLMStreamAdapter( llm_client=self.llm_client, llm_config=self.agent_state.llm_config, ) else: - llm_adapter = SimpleLettaLLMRequestAdapter( + llm_adapter = SimpleLLMRequestAdapter( llm_client=self.llm_client, llm_config=self.agent_state.llm_config, )