feat: introduce agent loop v3 (#4435)
This commit is contained in:
55
letta/adapters/letta_llm_adapter.py
Normal file
55
letta/adapters/letta_llm_adapter.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from letta.llm_api.llm_client_base import LLMClientBase
|
||||
from letta.schemas.letta_message import LettaMessage
|
||||
from letta.schemas.letta_message_content import ReasoningContent, RedactedReasoningContent, TextContent
|
||||
from letta.schemas.llm_config import LLMConfig
|
||||
from letta.schemas.openai.chat_completion_response import ChatCompletionResponse, ToolCall
|
||||
from letta.schemas.usage import LettaUsageStatistics
|
||||
|
||||
|
||||
class LettaLLMAdapter(ABC):
|
||||
"""
|
||||
Base adapter for handling LLM calls in a unified way.
|
||||
|
||||
This abstract class defines the interface for both blocking and streaming
|
||||
LLM interactions, allowing the agent to use different execution modes
|
||||
through a consistent API.
|
||||
"""
|
||||
|
||||
def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig):
|
||||
self.llm_client: LLMClientBase = llm_client
|
||||
self.llm_config: LLMConfig = llm_config
|
||||
self.message_id: str | None = None
|
||||
self.request_data: dict | None = None
|
||||
self.response_data: dict | None = None
|
||||
self.chat_completions_response: ChatCompletionResponse | None = None
|
||||
self.reasoning_content: list[TextContent | ReasoningContent | RedactedReasoningContent] | None = None
|
||||
self.tool_call: ToolCall | None = None
|
||||
self.usage: LettaUsageStatistics = LettaUsageStatistics()
|
||||
|
||||
@abstractmethod
|
||||
async def invoke_llm(
|
||||
self,
|
||||
request_data: dict,
|
||||
messages: list,
|
||||
tools: list,
|
||||
use_assistant_message: bool,
|
||||
) -> AsyncGenerator[LettaMessage, None]:
|
||||
"""
|
||||
Execute the LLM call and yield results as they become available.
|
||||
|
||||
Args:
|
||||
request_data: The prepared request data for the LLM API
|
||||
messages: The messages in context for the request
|
||||
tools: The tools available for the LLM to use
|
||||
use_assistant_message: If true, use assistant messages when streaming response
|
||||
|
||||
Yields:
|
||||
LettaMessage: Chunks of data for streaming adapters, or None for blocking adapters
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def supports_token_streaming(self) -> bool:
|
||||
return False
|
||||
74
letta/adapters/letta_llm_request_adapter.py
Normal file
74
letta/adapters/letta_llm_request_adapter.py
Normal file
@@ -0,0 +1,74 @@
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from letta.adapters.letta_llm_adapter import LettaLLMAdapter
|
||||
from letta.schemas.letta_message import LettaMessage
|
||||
from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, TextContent
|
||||
|
||||
|
||||
class LettaLLMRequestAdapter(LettaLLMAdapter):
|
||||
"""
|
||||
Adapter for handling blocking (non-streaming) LLM requests.
|
||||
|
||||
This adapter makes synchronous requests to the LLM and returns complete
|
||||
responses. It extracts reasoning content, tool calls, and usage statistics
|
||||
from the response and updates instance variables for access by the agent.
|
||||
"""
|
||||
|
||||
async def invoke_llm(
|
||||
self,
|
||||
request_data: dict,
|
||||
messages: list,
|
||||
tools: list,
|
||||
use_assistant_message: bool,
|
||||
) -> AsyncGenerator[LettaMessage, None]:
|
||||
"""
|
||||
Execute a blocking LLM request and yield the response.
|
||||
|
||||
This adapter:
|
||||
1. Makes a blocking request to the LLM
|
||||
2. Converts the response to chat completion format
|
||||
3. Extracts reasoning and tool call information
|
||||
4. Updates all instance variables
|
||||
5. Yields nothing (blocking mode doesn't stream)
|
||||
"""
|
||||
# Store request data
|
||||
self.request_data = request_data
|
||||
|
||||
# Make the blocking LLM request
|
||||
self.response_data = await self.llm_client.request_async(request_data, self.llm_config)
|
||||
|
||||
# Convert response to chat completion format
|
||||
self.chat_completions_response = self.llm_client.convert_response_to_chat_completion(self.response_data, messages, self.llm_config)
|
||||
|
||||
# Extract reasoning content from the response
|
||||
if self.chat_completions_response.choices[0].message.reasoning_content:
|
||||
self.reasoning_content = [
|
||||
ReasoningContent(
|
||||
reasoning=self.chat_completions_response.choices[0].message.reasoning_content,
|
||||
is_native=True,
|
||||
signature=self.chat_completions_response.choices[0].message.reasoning_content_signature,
|
||||
)
|
||||
]
|
||||
elif self.chat_completions_response.choices[0].message.omitted_reasoning_content:
|
||||
self.reasoning_content = [OmittedReasoningContent()]
|
||||
elif self.chat_completions_response.choices[0].message.content:
|
||||
# Reasoning placed into content for legacy reasons
|
||||
self.reasoning_content = [TextContent(text=self.chat_completions_response.choices[0].message.content)]
|
||||
else:
|
||||
# logger.info("No reasoning content found.")
|
||||
self.reasoning_content = None
|
||||
|
||||
# Extract tool call
|
||||
if self.chat_completions_response.choices[0].message.tool_calls:
|
||||
self.tool_call = self.chat_completions_response.choices[0].message.tool_calls[0]
|
||||
else:
|
||||
self.tool_call = None
|
||||
|
||||
# Extract usage statistics
|
||||
self.usage.step_count = 1
|
||||
self.usage.completion_tokens = self.chat_completions_response.usage.completion_tokens
|
||||
self.usage.prompt_tokens = self.chat_completions_response.usage.prompt_tokens
|
||||
self.usage.total_tokens = self.chat_completions_response.usage.total_tokens
|
||||
|
||||
yield None
|
||||
return
|
||||
113
letta/adapters/letta_llm_stream_adapter.py
Normal file
113
letta/adapters/letta_llm_stream_adapter.py
Normal file
@@ -0,0 +1,113 @@
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from letta.adapters.letta_llm_adapter import LettaLLMAdapter
|
||||
from letta.interfaces.anthropic_streaming_interface import AnthropicStreamingInterface
|
||||
from letta.interfaces.openai_streaming_interface import OpenAIStreamingInterface
|
||||
from letta.llm_api.llm_client_base import LLMClientBase
|
||||
from letta.schemas.enums import ProviderType
|
||||
from letta.schemas.letta_message import LettaMessage
|
||||
from letta.schemas.llm_config import LLMConfig
|
||||
from letta.schemas.usage import LettaUsageStatistics
|
||||
|
||||
|
||||
class LettaLLMStreamAdapter(LettaLLMAdapter):
|
||||
"""
|
||||
Adapter for handling streaming LLM requests with immediate token yielding.
|
||||
|
||||
This adapter supports real-time streaming of tokens from the LLM, providing
|
||||
minimal time-to-first-token (TTFT) latency. It uses specialized streaming
|
||||
interfaces for different providers (OpenAI, Anthropic) to handle their
|
||||
specific streaming formats.
|
||||
"""
|
||||
|
||||
def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig):
|
||||
super().__init__(llm_client, llm_config)
|
||||
self.interface: OpenAIStreamingInterface | AnthropicStreamingInterface | None = None
|
||||
|
||||
async def invoke_llm(
|
||||
self,
|
||||
request_data: dict,
|
||||
messages: list,
|
||||
tools: list,
|
||||
use_assistant_message: bool,
|
||||
) -> AsyncGenerator[LettaMessage, None]:
|
||||
"""
|
||||
Execute a streaming LLM request and yield tokens/chunks as they arrive.
|
||||
|
||||
This adapter:
|
||||
1. Makes a streaming request to the LLM
|
||||
2. Yields chunks immediately for minimal TTFT
|
||||
3. Accumulates response data through the streaming interface
|
||||
4. Updates all instance variables after streaming completes
|
||||
"""
|
||||
# Store request data
|
||||
self.request_data = request_data
|
||||
|
||||
# Instantiate streaming interface
|
||||
if self.llm_config.model_endpoint_type in [ProviderType.anthropic, ProviderType.bedrock]:
|
||||
self.interface = AnthropicStreamingInterface(
|
||||
use_assistant_message=use_assistant_message,
|
||||
put_inner_thoughts_in_kwarg=self.llm_config.put_inner_thoughts_in_kwargs,
|
||||
)
|
||||
elif self.llm_config.model_endpoint_type == ProviderType.openai:
|
||||
self.interface = OpenAIStreamingInterface(
|
||||
use_assistant_message=use_assistant_message,
|
||||
is_openai_proxy=self.llm_config.provider_name == "lmstudio_openai",
|
||||
put_inner_thoughts_in_kwarg=self.llm_config.put_inner_thoughts_in_kwargs,
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Streaming not supported for provider {self.llm_config.model_endpoint_type}")
|
||||
|
||||
# Extract optional parameters
|
||||
# ttft_span = kwargs.get('ttft_span', None)
|
||||
|
||||
# Start the streaming request
|
||||
stream = await self.llm_client.stream_async(request_data, self.llm_config)
|
||||
|
||||
# Process the stream and yield chunks immediately for TTFT
|
||||
async for chunk in self.interface.process(stream): # TODO: add ttft span
|
||||
# Yield each chunk immediately as it arrives
|
||||
yield chunk
|
||||
|
||||
# After streaming completes, extract the accumulated data
|
||||
|
||||
# Extract tool call from the interface
|
||||
try:
|
||||
self.tool_call = self.interface.get_tool_call_object()
|
||||
except ValueError as e:
|
||||
# No tool call, handle upstream
|
||||
self.tool_call = None
|
||||
|
||||
# Extract reasoning content from the interface
|
||||
self.reasoning_content = self.interface.get_reasoning_content()
|
||||
|
||||
# Extract usage statistics
|
||||
# Some providers don't provide usage in streaming, use fallback if needed
|
||||
if hasattr(self.interface, "input_tokens") and hasattr(self.interface, "output_tokens"):
|
||||
# Handle cases where tokens might not be set (e.g., LMStudio)
|
||||
input_tokens = self.interface.input_tokens
|
||||
output_tokens = self.interface.output_tokens
|
||||
|
||||
# Fallback to estimated values if not provided
|
||||
if not input_tokens and hasattr(self.interface, "fallback_input_tokens"):
|
||||
input_tokens = self.interface.fallback_input_tokens
|
||||
if not output_tokens and hasattr(self.interface, "fallback_output_tokens"):
|
||||
output_tokens = self.interface.fallback_output_tokens
|
||||
|
||||
self.usage = LettaUsageStatistics(
|
||||
step_count=1,
|
||||
completion_tokens=output_tokens or 0,
|
||||
prompt_tokens=input_tokens or 0,
|
||||
total_tokens=(input_tokens or 0) + (output_tokens or 0),
|
||||
)
|
||||
else:
|
||||
# Default usage statistics if not available
|
||||
self.usage = LettaUsageStatistics(step_count=1, completion_tokens=0, prompt_tokens=0, total_tokens=0)
|
||||
|
||||
# Store any additional data from the interface
|
||||
self.message_id = self.interface.letta_message_id
|
||||
|
||||
def supports_token_streaming(self) -> bool:
|
||||
return True
|
||||
72
letta/agents/base_agent_v2.py
Normal file
72
letta/agents/base_agent_v2.py
Normal file
@@ -0,0 +1,72 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from letta.constants import DEFAULT_MAX_STEPS
|
||||
from letta.log import get_logger
|
||||
from letta.schemas.agent import AgentState
|
||||
from letta.schemas.enums import MessageStreamStatus
|
||||
from letta.schemas.letta_message import LegacyLettaMessage, LettaMessage
|
||||
from letta.schemas.letta_response import LettaResponse
|
||||
from letta.schemas.message import MessageCreate
|
||||
from letta.schemas.user import User
|
||||
|
||||
|
||||
class BaseAgentV2(ABC):
|
||||
"""
|
||||
Abstract base class for the letta gent loop, handling message management,
|
||||
llm api request, tool execution, and context tracking.
|
||||
"""
|
||||
|
||||
def __init__(self, agent_state: AgentState, actor: User):
|
||||
self.agent_state = agent_state
|
||||
self.actor = actor
|
||||
self.logger = get_logger(agent_state.id)
|
||||
|
||||
@abstractmethod
|
||||
async def build_request(
|
||||
self,
|
||||
input_messages: list[MessageCreate],
|
||||
) -> dict:
|
||||
"""
|
||||
Main execution loop for the agent. This method only returns once the agent completes
|
||||
execution, returning all messages at once.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def step(
|
||||
self,
|
||||
input_messages: list[MessageCreate],
|
||||
max_steps: int = DEFAULT_MAX_STEPS,
|
||||
) -> LettaResponse:
|
||||
"""
|
||||
Main execution loop for the agent. This method only returns once the agent completes
|
||||
execution, returning all messages at once.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def stream_steps(
|
||||
self,
|
||||
input_messages: list[MessageCreate],
|
||||
max_steps: int = DEFAULT_MAX_STEPS,
|
||||
) -> AsyncGenerator[LettaMessage | LegacyLettaMessage | MessageStreamStatus, None]:
|
||||
"""
|
||||
Main execution loop for the agent. This method returns an async generator, streaming
|
||||
each step as it completes on the server side.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def stream_tokens(
|
||||
self,
|
||||
input_messages: list[MessageCreate],
|
||||
max_steps: int = DEFAULT_MAX_STEPS,
|
||||
) -> AsyncGenerator[LettaMessage | LegacyLettaMessage | MessageStreamStatus, None]:
|
||||
"""
|
||||
Main execution loop for the agent. This method returns an async generator, streaming
|
||||
each token as it is returned from the underlying llm api. Not all llm providers offer
|
||||
native token streaming functionality; in these cases, this api streams back steps
|
||||
rather than individual tokens.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
1020
letta/agents/letta_agent_v2.py
Normal file
1020
letta/agents/letta_agent_v2.py
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user