From 871e171b4475dd5ecca0ec38ca4965216e500fab Mon Sep 17 00:00:00 2001 From: cthomas Date: Tue, 27 May 2025 16:20:05 -0700 Subject: [PATCH] feat: add tracing to streaming interface (#2477) --- letta/interfaces/anthropic_streaming_interface.py | 2 ++ letta/interfaces/openai_streaming_interface.py | 2 ++ letta/services/helpers/agent_manager_helper.py | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/letta/interfaces/anthropic_streaming_interface.py b/letta/interfaces/anthropic_streaming_interface.py index 1a8aa220..6fb0ae34 100644 --- a/letta/interfaces/anthropic_streaming_interface.py +++ b/letta/interfaces/anthropic_streaming_interface.py @@ -37,6 +37,7 @@ from letta.schemas.letta_message_content import ReasoningContent, RedactedReason from letta.schemas.message import Message from letta.schemas.openai.chat_completion_response import FunctionCall, ToolCall from letta.server.rest_api.json_parser import JSONParser, PydanticJSONParser +from letta.tracing import trace_method logger = get_logger(__name__) @@ -115,6 +116,7 @@ class AnthropicStreamingInterface: logger.error("Error checking inner thoughts: %s", e) raise + @trace_method async def process(self, stream: AsyncStream[BetaRawMessageStreamEvent]) -> AsyncGenerator[LettaMessage, None]: prev_message_type = None message_index = 0 diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 212817e0..3edde5ec 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -11,6 +11,7 @@ from letta.schemas.message import Message from letta.schemas.openai.chat_completion_response import FunctionCall, ToolCall from letta.server.rest_api.json_parser import OptimisticJSONParser from letta.streaming_utils import JSONInnerThoughtsExtractor +from letta.tracing import trace_method class OpenAIStreamingInterface: @@ -64,6 +65,7 @@ class OpenAIStreamingInterface: function=FunctionCall(arguments=self.current_function_arguments, name=function_name), ) + @trace_method async def process(self, stream: AsyncStream[ChatCompletionChunk]) -> AsyncGenerator[LettaMessage, None]: """ Iterates over the OpenAI stream, yielding SSE events. diff --git a/letta/services/helpers/agent_manager_helper.py b/letta/services/helpers/agent_manager_helper.py index 0b20e31b..23146bbc 100644 --- a/letta/services/helpers/agent_manager_helper.py +++ b/letta/services/helpers/agent_manager_helper.py @@ -1,7 +1,7 @@ import datetime from typing import List, Literal, Optional -from sqlalchemy import and_, asc, desc, func, literal, or_, select +from sqlalchemy import and_, asc, desc, or_, select from sqlalchemy.sql.expression import exists from letta import system