diff --git a/letta/agents/base_agent.py b/letta/agents/base_agent.py index dcc0f2ec..d4e4e38f 100644 --- a/letta/agents/base_agent.py +++ b/letta/agents/base_agent.py @@ -13,6 +13,7 @@ from letta.schemas.letta_message import LegacyLettaMessage, LettaMessage from letta.schemas.letta_message_content import TextContent from letta.schemas.letta_response import LettaResponse from letta.schemas.message import Message, MessageCreate, MessageUpdate +from letta.schemas.usage import LettaUsageStatistics from letta.schemas.user import User from letta.services.agent_manager import AgentManager from letta.services.helpers.agent_manager_helper import compile_system_message @@ -136,3 +137,9 @@ class BaseAgent(ABC): except: logger.exception(f"Failed to rebuild memory for agent id={agent_state.id} and actor=({self.actor.id}, {self.actor.name})") raise + + def get_finish_chunks_for_stream(self, usage: LettaUsageStatistics): + return [ + usage.model_dump_json(), + MessageStreamStatus.done.value, + ] diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index 6d40704f..0620ccce 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -25,7 +25,7 @@ from letta.otel.context import get_ctx_attributes from letta.otel.metric_registry import MetricRegistry from letta.otel.tracing import log_event, trace_method, tracer from letta.schemas.agent import AgentState -from letta.schemas.enums import MessageRole, MessageStreamStatus +from letta.schemas.enums import MessageRole from letta.schemas.letta_message import MessageType from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent from letta.schemas.letta_response import LettaResponse @@ -262,9 +262,7 @@ class LettaAgent(BaseAgent): ) for message in letta_messages: - if not include_return_message_types: - yield f"data: {message.model_dump_json()}\n\n" - elif include_return_message_types and message.message_type in include_return_message_types: + if include_return_message_types is None or message.message_type in include_return_message_types: yield f"data: {message.model_dump_json()}\n\n" if not should_continue: @@ -288,8 +286,8 @@ class LettaAgent(BaseAgent): request_span.end() # Return back usage - yield f"data: {usage.model_dump_json()}\n\n" - yield f"data: {MessageStreamStatus.done.model_dump_json()}\n\n" + for finish_chunk in self.get_finish_chunks_for_stream(usage): + yield f"data: {finish_chunk}\n\n" async def _step( self, @@ -506,10 +504,7 @@ class LettaAgent(BaseAgent): request_span.add_event(name="time_to_first_token_ms", attributes={"ttft_ms": ns_to_ms(ttft_ns)}) first_chunk = False - if include_return_message_types is None: - # return all data - yield f"data: {chunk.model_dump_json()}\n\n" - elif include_return_message_types and chunk.message_type in include_return_message_types: + if include_return_message_types is None or chunk.message_type in include_return_message_types: # filter down returned data yield f"data: {chunk.model_dump_json()}\n\n" @@ -610,9 +605,8 @@ class LettaAgent(BaseAgent): request_span.add_event(name="letta_request_ms", attributes={"duration_ms": ns_to_ms(request_ns)}) request_span.end() - # TODO: Also yield out a letta usage stats SSE - yield f"data: {usage.model_dump_json()}\n\n" - yield f"data: {MessageStreamStatus.done.model_dump_json()}\n\n" + for finish_chunk in self.get_finish_chunks_for_stream(usage): + yield f"data: {finish_chunk}\n\n" async def _build_and_request_from_llm( self, diff --git a/letta/groups/sleeptime_multi_agent_v2.py b/letta/groups/sleeptime_multi_agent_v2.py index f0875941..97c9216b 100644 --- a/letta/groups/sleeptime_multi_agent_v2.py +++ b/letta/groups/sleeptime_multi_agent_v2.py @@ -144,7 +144,8 @@ class SleeptimeMultiAgentV2(BaseAgent): for message in response.messages: yield f"data: {message.model_dump_json()}\n\n" - yield f"data: {response.usage.model_dump_json()}\n\n" + for finish_chunk in self.get_finish_chunks_for_stream(response.usage): + yield f"data: {finish_chunk}\n\n" @trace_method async def step_stream( diff --git a/letta/interfaces/anthropic_streaming_interface.py b/letta/interfaces/anthropic_streaming_interface.py index 78bac414..e4f33133 100644 --- a/letta/interfaces/anthropic_streaming_interface.py +++ b/letta/interfaces/anthropic_streaming_interface.py @@ -90,6 +90,8 @@ class AnthropicStreamingInterface: def get_tool_call_object(self) -> ToolCall: """Useful for agent loop""" + if not self.tool_call_name: + raise ValueError("No tool call returned") # hack for tool rules try: tool_input = json.loads(self.accumulated_tool_call_args) diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 7417be50..3020e7a4 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -6,6 +6,7 @@ from openai.types.chat.chat_completion_chunk import ChatCompletionChunk from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG from letta.helpers.datetime_helpers import get_utc_timestamp_ns, ns_to_ms +from letta.log import get_logger from letta.schemas.letta_message import AssistantMessage, LettaMessage, ReasoningMessage, ToolCallDelta, ToolCallMessage from letta.schemas.letta_message_content import TextContent from letta.schemas.message import Message @@ -13,6 +14,8 @@ from letta.schemas.openai.chat_completion_response import FunctionCall, ToolCall from letta.server.rest_api.json_parser import OptimisticJSONParser from letta.streaming_utils import JSONInnerThoughtsExtractor +logger = get_logger(__name__) + class OpenAIStreamingInterface: """ @@ -60,6 +63,8 @@ class OpenAIStreamingInterface: def get_tool_call_object(self) -> ToolCall: """Useful for agent loop""" function_name = self.last_flushed_function_name if self.last_flushed_function_name else self.function_name_buffer + if not function_name: + raise ValueError("No tool call ID available") tool_call_id = self.last_flushed_function_id if self.last_flushed_function_id else self.function_id_buffer if not tool_call_id: raise ValueError("No tool call ID available") @@ -79,254 +84,259 @@ class OpenAIStreamingInterface: It also collects tokens and detects if a tool call is triggered. """ first_chunk = True + try: + async with stream: + prev_message_type = None + message_index = 0 + async for chunk in stream: + if first_chunk and ttft_span is not None and provider_request_start_timestamp_ns is not None: + now = get_utc_timestamp_ns() + ttft_ns = now - provider_request_start_timestamp_ns + ttft_span.add_event( + name="openai_time_to_first_token_ms", attributes={"openai_time_to_first_token_ms": ns_to_ms(ttft_ns)} + ) + first_chunk = False - async with stream: - prev_message_type = None - message_index = 0 - async for chunk in stream: - if first_chunk and ttft_span is not None and provider_request_start_timestamp_ns is not None: - now = get_utc_timestamp_ns() - ttft_ns = now - provider_request_start_timestamp_ns - ttft_span.add_event( - name="openai_time_to_first_token_ms", attributes={"openai_time_to_first_token_ms": ns_to_ms(ttft_ns)} - ) - first_chunk = False + if not self.model or not self.message_id: + self.model = chunk.model + self.message_id = chunk.id - if not self.model or not self.message_id: - self.model = chunk.model - self.message_id = chunk.id + # track usage + if chunk.usage: + self.input_tokens += chunk.usage.prompt_tokens + self.output_tokens += chunk.usage.completion_tokens - # track usage - if chunk.usage: - self.input_tokens += chunk.usage.prompt_tokens - self.output_tokens += chunk.usage.completion_tokens + if chunk.choices: + choice = chunk.choices[0] + message_delta = choice.delta - if chunk.choices: - choice = chunk.choices[0] - message_delta = choice.delta + if message_delta.tool_calls is not None and len(message_delta.tool_calls) > 0: + tool_call = message_delta.tool_calls[0] - if message_delta.tool_calls is not None and len(message_delta.tool_calls) > 0: - tool_call = message_delta.tool_calls[0] - - if tool_call.function.name: - # If we're waiting for the first key, then we should hold back the name - # ie add it to a buffer instead of returning it as a chunk - if self.function_name_buffer is None: - self.function_name_buffer = tool_call.function.name - else: - self.function_name_buffer += tool_call.function.name - - if tool_call.id: - # Buffer until next time - if self.function_id_buffer is None: - self.function_id_buffer = tool_call.id - else: - self.function_id_buffer += tool_call.id - - if tool_call.function.arguments: - # updates_main_json, updates_inner_thoughts = self.function_args_reader.process_fragment(tool_call.function.arguments) - self.current_function_arguments += tool_call.function.arguments - updates_main_json, updates_inner_thoughts = self.function_args_reader.process_fragment( - tool_call.function.arguments - ) - - # If we have inner thoughts, we should output them as a chunk - if updates_inner_thoughts: - if prev_message_type and prev_message_type != "reasoning_message": - message_index += 1 - self.reasoning_messages.append(updates_inner_thoughts) - reasoning_message = ReasoningMessage( - id=self.letta_message_id, - date=datetime.now(timezone.utc), - reasoning=updates_inner_thoughts, - # name=name, - otid=Message.generate_otid_from_id(self.letta_message_id, message_index), - ) - prev_message_type = reasoning_message.message_type - yield reasoning_message - - # Additionally inner thoughts may stream back with a chunk of main JSON - # In that case, since we can only return a chunk at a time, we should buffer it - if updates_main_json: - if self.function_args_buffer is None: - self.function_args_buffer = updates_main_json - else: - self.function_args_buffer += updates_main_json - - # If we have main_json, we should output a ToolCallMessage - elif updates_main_json: - - # If there's something in the function_name buffer, we should release it first - # NOTE: we could output it as part of a chunk that has both name and args, - # however the frontend may expect name first, then args, so to be - # safe we'll output name first in a separate chunk - if self.function_name_buffer: - - # use_assisitant_message means that we should also not release main_json raw, and instead should only release the contents of "message": "..." - if self.use_assistant_message and self.function_name_buffer == self.assistant_message_tool_name: - - # Store the ID of the tool call so allow skipping the corresponding response - if self.function_id_buffer: - self.prev_assistant_message_id = self.function_id_buffer - - else: - if prev_message_type and prev_message_type != "tool_call_message": - message_index += 1 - self.tool_call_name = str(self.function_name_buffer) - tool_call_msg = ToolCallMessage( - id=self.letta_message_id, - date=datetime.now(timezone.utc), - tool_call=ToolCallDelta( - name=self.function_name_buffer, - arguments=None, - tool_call_id=self.function_id_buffer, - ), - otid=Message.generate_otid_from_id(self.letta_message_id, message_index), - ) - prev_message_type = tool_call_msg.message_type - yield tool_call_msg - - # Record what the last function name we flushed was - self.last_flushed_function_name = self.function_name_buffer - if self.last_flushed_function_id is None: - self.last_flushed_function_id = self.function_id_buffer - # Clear the buffer - self.function_name_buffer = None - self.function_id_buffer = None - # Since we're clearing the name buffer, we should store - # any updates to the arguments inside a separate buffer - - # Add any main_json updates to the arguments buffer - if self.function_args_buffer is None: - self.function_args_buffer = updates_main_json - else: - self.function_args_buffer += updates_main_json - - # If there was nothing in the name buffer, we can proceed to - # output the arguments chunk as a ToolCallMessage + if tool_call.function.name: + # If we're waiting for the first key, then we should hold back the name + # ie add it to a buffer instead of returning it as a chunk + if self.function_name_buffer is None: + self.function_name_buffer = tool_call.function.name else: + self.function_name_buffer += tool_call.function.name - # use_assisitant_message means that we should also not release main_json raw, and instead should only release the contents of "message": "..." - if self.use_assistant_message and ( - self.last_flushed_function_name is not None - and self.last_flushed_function_name == self.assistant_message_tool_name - ): - # do an additional parse on the updates_main_json - if self.function_args_buffer: - updates_main_json = self.function_args_buffer + updates_main_json - self.function_args_buffer = None + if tool_call.id: + # Buffer until next time + if self.function_id_buffer is None: + self.function_id_buffer = tool_call.id + else: + self.function_id_buffer += tool_call.id - # Pretty gross hardcoding that assumes that if we're toggling into the keywords, we have the full prefix - match_str = '{"' + self.assistant_message_tool_kwarg + '":"' - if updates_main_json == match_str: - updates_main_json = None + if tool_call.function.arguments: + # updates_main_json, updates_inner_thoughts = self.function_args_reader.process_fragment(tool_call.function.arguments) + self.current_function_arguments += tool_call.function.arguments + updates_main_json, updates_inner_thoughts = self.function_args_reader.process_fragment( + tool_call.function.arguments + ) + # If we have inner thoughts, we should output them as a chunk + if updates_inner_thoughts: + if prev_message_type and prev_message_type != "reasoning_message": + message_index += 1 + self.reasoning_messages.append(updates_inner_thoughts) + reasoning_message = ReasoningMessage( + id=self.letta_message_id, + date=datetime.now(timezone.utc), + reasoning=updates_inner_thoughts, + # name=name, + otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + ) + prev_message_type = reasoning_message.message_type + yield reasoning_message + + # Additionally inner thoughts may stream back with a chunk of main JSON + # In that case, since we can only return a chunk at a time, we should buffer it + if updates_main_json: + if self.function_args_buffer is None: + self.function_args_buffer = updates_main_json else: - # Some hardcoding to strip off the trailing "}" - if updates_main_json in ["}", '"}']: - updates_main_json = None - if updates_main_json and len(updates_main_json) > 0 and updates_main_json[-1:] == '"': - updates_main_json = updates_main_json[:-1] + self.function_args_buffer += updates_main_json - if not updates_main_json: - # early exit to turn into content mode - continue + # If we have main_json, we should output a ToolCallMessage + elif updates_main_json: - # There may be a buffer from a previous chunk, for example - # if the previous chunk had arguments but we needed to flush name - if self.function_args_buffer: - # In this case, we should release the buffer + new data at once - combined_chunk = self.function_args_buffer + updates_main_json + # If there's something in the function_name buffer, we should release it first + # NOTE: we could output it as part of a chunk that has both name and args, + # however the frontend may expect name first, then args, so to be + # safe we'll output name first in a separate chunk + if self.function_name_buffer: - if prev_message_type and prev_message_type != "assistant_message": - message_index += 1 - assistant_message = AssistantMessage( - id=self.letta_message_id, - date=datetime.now(timezone.utc), - content=combined_chunk, - otid=Message.generate_otid_from_id(self.letta_message_id, message_index), - ) - prev_message_type = assistant_message.message_type - yield assistant_message - # Store the ID of the tool call so allow skipping the corresponding response - if self.function_id_buffer: - self.prev_assistant_message_id = self.function_id_buffer - # clear buffer - self.function_args_buffer = None - self.function_id_buffer = None - - else: - # If there's no buffer to clear, just output a new chunk with new data - # TODO: THIS IS HORRIBLE - # TODO: WE USE THE OLD JSON PARSER EARLIER (WHICH DOES NOTHING) AND NOW THE NEW JSON PARSER - # TODO: THIS IS TOTALLY WRONG AND BAD, BUT SAVING FOR A LARGER REWRITE IN THE NEAR FUTURE - parsed_args = self.optimistic_json_parser.parse(self.current_function_arguments) - - if parsed_args.get(self.assistant_message_tool_kwarg) and parsed_args.get( - self.assistant_message_tool_kwarg - ) != self.current_json_parse_result.get(self.assistant_message_tool_kwarg): - new_content = parsed_args.get(self.assistant_message_tool_kwarg) - prev_content = self.current_json_parse_result.get(self.assistant_message_tool_kwarg, "") - # TODO: Assumes consistent state and that prev_content is subset of new_content - diff = new_content.replace(prev_content, "", 1) - self.current_json_parse_result = parsed_args - if prev_message_type and prev_message_type != "assistant_message": - message_index += 1 - assistant_message = AssistantMessage( - id=self.letta_message_id, - date=datetime.now(timezone.utc), - content=diff, - # name=name, - otid=Message.generate_otid_from_id(self.letta_message_id, message_index), - ) - prev_message_type = assistant_message.message_type - yield assistant_message + # use_assisitant_message means that we should also not release main_json raw, and instead should only release the contents of "message": "..." + if self.use_assistant_message and self.function_name_buffer == self.assistant_message_tool_name: # Store the ID of the tool call so allow skipping the corresponding response if self.function_id_buffer: self.prev_assistant_message_id = self.function_id_buffer - # clear buffers - self.function_id_buffer = None - else: - # There may be a buffer from a previous chunk, for example - # if the previous chunk had arguments but we needed to flush name - if self.function_args_buffer: - # In this case, we should release the buffer + new data at once - combined_chunk = self.function_args_buffer + updates_main_json + else: if prev_message_type and prev_message_type != "tool_call_message": message_index += 1 + self.tool_call_name = str(self.function_name_buffer) tool_call_msg = ToolCallMessage( id=self.letta_message_id, date=datetime.now(timezone.utc), tool_call=ToolCallDelta( name=self.function_name_buffer, - arguments=combined_chunk, + arguments=None, tool_call_id=self.function_id_buffer, ), - # name=name, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), ) prev_message_type = tool_call_msg.message_type yield tool_call_msg - # clear buffer - self.function_args_buffer = None - self.function_id_buffer = None + + # Record what the last function name we flushed was + self.last_flushed_function_name = self.function_name_buffer + if self.last_flushed_function_id is None: + self.last_flushed_function_id = self.function_id_buffer + # Clear the buffer + self.function_name_buffer = None + self.function_id_buffer = None + # Since we're clearing the name buffer, we should store + # any updates to the arguments inside a separate buffer + + # Add any main_json updates to the arguments buffer + if self.function_args_buffer is None: + self.function_args_buffer = updates_main_json else: - # If there's no buffer to clear, just output a new chunk with new data - if prev_message_type and prev_message_type != "tool_call_message": - message_index += 1 - tool_call_msg = ToolCallMessage( - id=self.letta_message_id, - date=datetime.now(timezone.utc), - tool_call=ToolCallDelta( - name=None, - arguments=updates_main_json, - tool_call_id=self.function_id_buffer, - ), - # name=name, - otid=Message.generate_otid_from_id(self.letta_message_id, message_index), - ) - prev_message_type = tool_call_msg.message_type - yield tool_call_msg - self.function_id_buffer = None + self.function_args_buffer += updates_main_json + + # If there was nothing in the name buffer, we can proceed to + # output the arguments chunk as a ToolCallMessage + else: + + # use_assisitant_message means that we should also not release main_json raw, and instead should only release the contents of "message": "..." + if self.use_assistant_message and ( + self.last_flushed_function_name is not None + and self.last_flushed_function_name == self.assistant_message_tool_name + ): + # do an additional parse on the updates_main_json + if self.function_args_buffer: + updates_main_json = self.function_args_buffer + updates_main_json + self.function_args_buffer = None + + # Pretty gross hardcoding that assumes that if we're toggling into the keywords, we have the full prefix + match_str = '{"' + self.assistant_message_tool_kwarg + '":"' + if updates_main_json == match_str: + updates_main_json = None + + else: + # Some hardcoding to strip off the trailing "}" + if updates_main_json in ["}", '"}']: + updates_main_json = None + if updates_main_json and len(updates_main_json) > 0 and updates_main_json[-1:] == '"': + updates_main_json = updates_main_json[:-1] + + if not updates_main_json: + # early exit to turn into content mode + continue + + # There may be a buffer from a previous chunk, for example + # if the previous chunk had arguments but we needed to flush name + if self.function_args_buffer: + # In this case, we should release the buffer + new data at once + combined_chunk = self.function_args_buffer + updates_main_json + + if prev_message_type and prev_message_type != "assistant_message": + message_index += 1 + assistant_message = AssistantMessage( + id=self.letta_message_id, + date=datetime.now(timezone.utc), + content=combined_chunk, + otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + ) + prev_message_type = assistant_message.message_type + yield assistant_message + # Store the ID of the tool call so allow skipping the corresponding response + if self.function_id_buffer: + self.prev_assistant_message_id = self.function_id_buffer + # clear buffer + self.function_args_buffer = None + self.function_id_buffer = None + + else: + # If there's no buffer to clear, just output a new chunk with new data + # TODO: THIS IS HORRIBLE + # TODO: WE USE THE OLD JSON PARSER EARLIER (WHICH DOES NOTHING) AND NOW THE NEW JSON PARSER + # TODO: THIS IS TOTALLY WRONG AND BAD, BUT SAVING FOR A LARGER REWRITE IN THE NEAR FUTURE + parsed_args = self.optimistic_json_parser.parse(self.current_function_arguments) + + if parsed_args.get(self.assistant_message_tool_kwarg) and parsed_args.get( + self.assistant_message_tool_kwarg + ) != self.current_json_parse_result.get(self.assistant_message_tool_kwarg): + new_content = parsed_args.get(self.assistant_message_tool_kwarg) + prev_content = self.current_json_parse_result.get(self.assistant_message_tool_kwarg, "") + # TODO: Assumes consistent state and that prev_content is subset of new_content + diff = new_content.replace(prev_content, "", 1) + self.current_json_parse_result = parsed_args + if prev_message_type and prev_message_type != "assistant_message": + message_index += 1 + assistant_message = AssistantMessage( + id=self.letta_message_id, + date=datetime.now(timezone.utc), + content=diff, + # name=name, + otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + ) + prev_message_type = assistant_message.message_type + yield assistant_message + + # Store the ID of the tool call so allow skipping the corresponding response + if self.function_id_buffer: + self.prev_assistant_message_id = self.function_id_buffer + # clear buffers + self.function_id_buffer = None + else: + + # There may be a buffer from a previous chunk, for example + # if the previous chunk had arguments but we needed to flush name + if self.function_args_buffer: + # In this case, we should release the buffer + new data at once + combined_chunk = self.function_args_buffer + updates_main_json + if prev_message_type and prev_message_type != "tool_call_message": + message_index += 1 + tool_call_msg = ToolCallMessage( + id=self.letta_message_id, + date=datetime.now(timezone.utc), + tool_call=ToolCallDelta( + name=self.function_name_buffer, + arguments=combined_chunk, + tool_call_id=self.function_id_buffer, + ), + # name=name, + otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + ) + prev_message_type = tool_call_msg.message_type + yield tool_call_msg + # clear buffer + self.function_args_buffer = None + self.function_id_buffer = None + else: + # If there's no buffer to clear, just output a new chunk with new data + if prev_message_type and prev_message_type != "tool_call_message": + message_index += 1 + tool_call_msg = ToolCallMessage( + id=self.letta_message_id, + date=datetime.now(timezone.utc), + tool_call=ToolCallDelta( + name=None, + arguments=updates_main_json, + tool_call_id=self.function_id_buffer, + ), + # name=name, + otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + ) + prev_message_type = tool_call_msg.message_type + yield tool_call_msg + self.function_id_buffer = None + except Exception as e: + logger.error("Error processing stream: %s", e) + raise + finally: + logger.info("OpenAIStreamingInterface: Stream processing complete.")