From 647e271c2a54442e2416353186642dc61e3a96cc Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:22:42 -0800 Subject: [PATCH] fix: add more logging for stream error (#6490) * trying tout gpt-5.1-codex * add unit test for message content * try to support multimodal * remove ValueError and add logging on stream error * prevent stream termination from api spec implementation errors * fix: remove final_response references from non-Responses API interfaces * fix: add diagnostic attributes to SimpleOpenAIResponsesStreamingInterface * fix: remove final_response from SimpleOpenAIStreamingInterface (Chat Completions API) --- .../interfaces/openai_streaming_interface.py | 116 ++++++++++++++++-- 1 file changed, 108 insertions(+), 8 deletions(-) diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 5b5580c9..65fdfd39 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -135,6 +135,11 @@ class OpenAIStreamingInterface: self.requires_approval_tools = requires_approval_tools + # Diagnostic: track last event for debugging + self.last_event_type: str | None = None + self.total_events_received: int = 0 + self.stream_was_cancelled: bool = False + def get_reasoning_content(self) -> list[TextContent | OmittedReasoningContent]: content = "".join(self.reasoning_messages).strip() @@ -223,7 +228,14 @@ class OpenAIStreamingInterface: except asyncio.CancelledError as e: import traceback - logger.info("Cancelled stream attempt but overriding %s: %s", e, traceback.format_exc()) + self.stream_was_cancelled = True + logger.warning( + "Stream was cancelled (CancelledError). Attempting to process current event. " + f"Events received so far: {self.total_events_received}, last event: {self.last_event_type}. " + f"Error: %s, trace: %s", + e, + traceback.format_exc(), + ) async for message in self._process_chunk(chunk, ttft_span, prev_message_type, message_index): new_message_type = message.message_type if new_message_type != prev_message_type: @@ -235,6 +247,13 @@ class OpenAIStreamingInterface: # Don't raise the exception here continue + # Stream iterator exited normally + logger.info( + f"Chat Completions stream iterator exited. " + f"Received {self.total_events_received} events, " + f"last event: {self.last_event_type}" + ) + except Exception as e: import traceback @@ -247,7 +266,12 @@ class OpenAIStreamingInterface: yield LettaStopReason(stop_reason=StopReasonType.error) raise e finally: - logger.info("OpenAIStreamingInterface: Stream processing complete.") + logger.info( + f"OpenAIStreamingInterface: Stream processing complete. " + f"Received {self.total_events_received} events, " + f"last event: {self.last_event_type}, " + f"stream was cancelled: {self.stream_was_cancelled}" + ) async def _process_chunk( self, @@ -256,6 +280,13 @@ class OpenAIStreamingInterface: prev_message_type: Optional[str] = None, message_index: int = 0, ) -> AsyncGenerator[LettaMessage | LettaStopReason, None]: + # Track events for diagnostics + self.total_events_received += 1 + self.last_event_type = "ChatCompletionChunk" + # Track events for diagnostics + self.total_events_received += 1 + self.last_event_type = "ChatCompletionChunk" + if not self.model or not self.message_id: self.model = chunk.model self.message_id = chunk.id @@ -550,6 +581,11 @@ class SimpleOpenAIStreamingInterface: # Raw usage from provider (for transparent logging in provider trace) self.raw_usage: dict | None = None + # Diagnostic: track last event for debugging + self.last_event_type: str | None = None + self.total_events_received: int = 0 + self.stream_was_cancelled: bool = False + # Fallback token counters (using tiktoken cl200k-base) self.fallback_input_tokens = 0 self.fallback_output_tokens = 0 @@ -674,7 +710,14 @@ class SimpleOpenAIStreamingInterface: except asyncio.CancelledError as e: import traceback - logger.info("Cancelled stream attempt but overriding %s: %s", e, traceback.format_exc()) + self.stream_was_cancelled = True + logger.warning( + "Stream was cancelled (CancelledError). Attempting to process current event. " + f"Events received so far: {self.total_events_received}, last event: {self.last_event_type}. " + f"Error: %s, trace: %s", + e, + traceback.format_exc(), + ) async for message in self._process_chunk(chunk, ttft_span, prev_message_type, message_index): new_message_type = message.message_type if new_message_type != prev_message_type: @@ -686,6 +729,13 @@ class SimpleOpenAIStreamingInterface: # Don't raise the exception here continue + # Stream iterator exited normally + logger.info( + f"Chat Completions stream iterator exited (SimpleOpenAIStreamingInterface). " + f"Received {self.total_events_received} events, " + f"last event: {self.last_event_type}" + ) + except Exception as e: import traceback @@ -698,7 +748,12 @@ class SimpleOpenAIStreamingInterface: yield LettaStopReason(stop_reason=StopReasonType.error) raise e finally: - logger.info("OpenAIStreamingInterface: Stream processing complete.") + logger.info( + f"SimpleOpenAIStreamingInterface: Stream processing complete. " + f"Received {self.total_events_received} events, " + f"last event: {self.last_event_type}, " + f"stream was cancelled: {self.stream_was_cancelled}" + ) async def _process_chunk( self, @@ -893,6 +948,11 @@ class SimpleOpenAIResponsesStreamingInterface: # Raw usage from provider (for transparent logging in provider trace) self.raw_usage: dict | None = None + # Diagnostic: track last event for debugging + self.last_event_type: str | None = None + self.total_events_received: int = 0 + self.stream_was_cancelled: bool = False + # -------- Mapping helpers (no broad try/except) -------- def _record_tool_mapping(self, event: object, item: object) -> tuple[str | None, str | None, int | None, str | None]: """Record call_id/name mapping for this tool-call using output_index and item.id if present. @@ -924,7 +984,14 @@ class SimpleOpenAIResponsesStreamingInterface: def get_content(self) -> list[TextContent | SummarizedReasoningContent]: """This includes both SummarizedReasoningContent and TextContent""" if self.final_response is None: - raise ValueError("No final response available") + logger.warning( + "No final response available - stream may have been interrupted or ResponseCompletedEvent was not received. " + f"Diagnostic info: received {self.total_events_received} events total, " + f"last event type: {self.last_event_type}, " + f"stream was cancelled: {self.stream_was_cancelled}. " + "Returning empty content list." + ) + return [] content = [] for response in self.final_response.output: @@ -996,6 +1063,7 @@ class SimpleOpenAIResponsesStreamingInterface: prev_message_type = None message_index = 0 try: + logger.info("Starting ResponsesAPI stream processing") async with stream: async for event in stream: try: @@ -1006,10 +1074,29 @@ class SimpleOpenAIResponsesStreamingInterface: message_index += 1 prev_message_type = new_message_type yield message + except (TypeError, AttributeError, KeyError, ValueError) as e: + # Event parsing/processing error - log and skip this event + import traceback + + logger.error( + f"Error processing event {type(event).__name__} at position {self.total_events_received}: {e}. " + f"Event data: {event if hasattr(event, '__dict__') else str(event)[:500]}. " + f"Skipping this event and continuing stream.", + exc_info=True, + ) + # Continue to next event rather than killing the stream + continue except asyncio.CancelledError as e: import traceback - logger.info("Cancelled stream attempt but overriding %s: %s", e, traceback.format_exc()) + self.stream_was_cancelled = True + logger.warning( + "Stream was cancelled (CancelledError). Attempting to process current event. " + f"Events received so far: {self.total_events_received}, last event: {self.last_event_type}. " + f"Error: %s, trace: %s", + e, + traceback.format_exc(), + ) async for message in self._process_event(event, ttft_span, prev_message_type, message_index): new_message_type = message.message_type if new_message_type != prev_message_type: @@ -1033,7 +1120,13 @@ class SimpleOpenAIResponsesStreamingInterface: yield LettaStopReason(stop_reason=StopReasonType.error) raise e finally: - logger.info("OpenAIStreamingInterface: Stream processing complete.") + logger.info( + f"ResponsesAPI Stream processing complete. " + f"Received {self.total_events_received} events, " + f"last event: {self.last_event_type}, " + f"has final_response: {self.final_response is not None}, " + f"stream was cancelled: {self.stream_was_cancelled}" + ) async def _process_event( self, @@ -1042,6 +1135,9 @@ class SimpleOpenAIResponsesStreamingInterface: prev_message_type: Optional[str] = None, message_index: int = 0, ) -> AsyncGenerator[LettaMessage | LettaStopReason, None]: + # Track events for diagnostics + self.total_events_received += 1 + self.last_event_type = type(event).__name__ if isinstance(event, ResponseCreatedEvent): # No-op, just had the input events return @@ -1338,7 +1434,11 @@ class SimpleOpenAIResponsesStreamingInterface: return else: - logger.debug(f"Unhandled event: {event}") + event_type = type(event).__name__ + logger.warning(f"Unhandled event type: {event_type}. Event details: {event if hasattr(event, '__dict__') else str(event)}") + # Check if this is an error event we should handle + if hasattr(event, "error") and event.error is not None: + logger.error(f"Stream error event received: {event.error}") return