fix: add more logging for stream error (#6490)

* trying tout gpt-5.1-codex

* add unit test for message content

* try to support multimodal

* remove ValueError and add logging on stream error

* prevent stream termination from api spec implementation errors

* fix: remove final_response references from non-Responses API interfaces

* fix: add diagnostic attributes to SimpleOpenAIResponsesStreamingInterface

* fix: remove final_response from SimpleOpenAIStreamingInterface (Chat Completions API)
This commit is contained in:
Kian Jones
2025-12-02 17:22:42 -08:00
committed by Caren Thomas
parent 5165d60881
commit 647e271c2a

View File

@@ -135,6 +135,11 @@ class OpenAIStreamingInterface:
self.requires_approval_tools = requires_approval_tools
# Diagnostic: track last event for debugging
self.last_event_type: str | None = None
self.total_events_received: int = 0
self.stream_was_cancelled: bool = False
def get_reasoning_content(self) -> list[TextContent | OmittedReasoningContent]:
content = "".join(self.reasoning_messages).strip()
@@ -223,7 +228,14 @@ class OpenAIStreamingInterface:
except asyncio.CancelledError as e:
import traceback
logger.info("Cancelled stream attempt but overriding %s: %s", e, traceback.format_exc())
self.stream_was_cancelled = True
logger.warning(
"Stream was cancelled (CancelledError). Attempting to process current event. "
f"Events received so far: {self.total_events_received}, last event: {self.last_event_type}. "
f"Error: %s, trace: %s",
e,
traceback.format_exc(),
)
async for message in self._process_chunk(chunk, ttft_span, prev_message_type, message_index):
new_message_type = message.message_type
if new_message_type != prev_message_type:
@@ -235,6 +247,13 @@ class OpenAIStreamingInterface:
# Don't raise the exception here
continue
# Stream iterator exited normally
logger.info(
f"Chat Completions stream iterator exited. "
f"Received {self.total_events_received} events, "
f"last event: {self.last_event_type}"
)
except Exception as e:
import traceback
@@ -247,7 +266,12 @@ class OpenAIStreamingInterface:
yield LettaStopReason(stop_reason=StopReasonType.error)
raise e
finally:
logger.info("OpenAIStreamingInterface: Stream processing complete.")
logger.info(
f"OpenAIStreamingInterface: Stream processing complete. "
f"Received {self.total_events_received} events, "
f"last event: {self.last_event_type}, "
f"stream was cancelled: {self.stream_was_cancelled}"
)
async def _process_chunk(
self,
@@ -256,6 +280,13 @@ class OpenAIStreamingInterface:
prev_message_type: Optional[str] = None,
message_index: int = 0,
) -> AsyncGenerator[LettaMessage | LettaStopReason, None]:
# Track events for diagnostics
self.total_events_received += 1
self.last_event_type = "ChatCompletionChunk"
# Track events for diagnostics
self.total_events_received += 1
self.last_event_type = "ChatCompletionChunk"
if not self.model or not self.message_id:
self.model = chunk.model
self.message_id = chunk.id
@@ -550,6 +581,11 @@ class SimpleOpenAIStreamingInterface:
# Raw usage from provider (for transparent logging in provider trace)
self.raw_usage: dict | None = None
# Diagnostic: track last event for debugging
self.last_event_type: str | None = None
self.total_events_received: int = 0
self.stream_was_cancelled: bool = False
# Fallback token counters (using tiktoken cl200k-base)
self.fallback_input_tokens = 0
self.fallback_output_tokens = 0
@@ -674,7 +710,14 @@ class SimpleOpenAIStreamingInterface:
except asyncio.CancelledError as e:
import traceback
logger.info("Cancelled stream attempt but overriding %s: %s", e, traceback.format_exc())
self.stream_was_cancelled = True
logger.warning(
"Stream was cancelled (CancelledError). Attempting to process current event. "
f"Events received so far: {self.total_events_received}, last event: {self.last_event_type}. "
f"Error: %s, trace: %s",
e,
traceback.format_exc(),
)
async for message in self._process_chunk(chunk, ttft_span, prev_message_type, message_index):
new_message_type = message.message_type
if new_message_type != prev_message_type:
@@ -686,6 +729,13 @@ class SimpleOpenAIStreamingInterface:
# Don't raise the exception here
continue
# Stream iterator exited normally
logger.info(
f"Chat Completions stream iterator exited (SimpleOpenAIStreamingInterface). "
f"Received {self.total_events_received} events, "
f"last event: {self.last_event_type}"
)
except Exception as e:
import traceback
@@ -698,7 +748,12 @@ class SimpleOpenAIStreamingInterface:
yield LettaStopReason(stop_reason=StopReasonType.error)
raise e
finally:
logger.info("OpenAIStreamingInterface: Stream processing complete.")
logger.info(
f"SimpleOpenAIStreamingInterface: Stream processing complete. "
f"Received {self.total_events_received} events, "
f"last event: {self.last_event_type}, "
f"stream was cancelled: {self.stream_was_cancelled}"
)
async def _process_chunk(
self,
@@ -893,6 +948,11 @@ class SimpleOpenAIResponsesStreamingInterface:
# Raw usage from provider (for transparent logging in provider trace)
self.raw_usage: dict | None = None
# Diagnostic: track last event for debugging
self.last_event_type: str | None = None
self.total_events_received: int = 0
self.stream_was_cancelled: bool = False
# -------- Mapping helpers (no broad try/except) --------
def _record_tool_mapping(self, event: object, item: object) -> tuple[str | None, str | None, int | None, str | None]:
"""Record call_id/name mapping for this tool-call using output_index and item.id if present.
@@ -924,7 +984,14 @@ class SimpleOpenAIResponsesStreamingInterface:
def get_content(self) -> list[TextContent | SummarizedReasoningContent]:
"""This includes both SummarizedReasoningContent and TextContent"""
if self.final_response is None:
raise ValueError("No final response available")
logger.warning(
"No final response available - stream may have been interrupted or ResponseCompletedEvent was not received. "
f"Diagnostic info: received {self.total_events_received} events total, "
f"last event type: {self.last_event_type}, "
f"stream was cancelled: {self.stream_was_cancelled}. "
"Returning empty content list."
)
return []
content = []
for response in self.final_response.output:
@@ -996,6 +1063,7 @@ class SimpleOpenAIResponsesStreamingInterface:
prev_message_type = None
message_index = 0
try:
logger.info("Starting ResponsesAPI stream processing")
async with stream:
async for event in stream:
try:
@@ -1006,10 +1074,29 @@ class SimpleOpenAIResponsesStreamingInterface:
message_index += 1
prev_message_type = new_message_type
yield message
except (TypeError, AttributeError, KeyError, ValueError) as e:
# Event parsing/processing error - log and skip this event
import traceback
logger.error(
f"Error processing event {type(event).__name__} at position {self.total_events_received}: {e}. "
f"Event data: {event if hasattr(event, '__dict__') else str(event)[:500]}. "
f"Skipping this event and continuing stream.",
exc_info=True,
)
# Continue to next event rather than killing the stream
continue
except asyncio.CancelledError as e:
import traceback
logger.info("Cancelled stream attempt but overriding %s: %s", e, traceback.format_exc())
self.stream_was_cancelled = True
logger.warning(
"Stream was cancelled (CancelledError). Attempting to process current event. "
f"Events received so far: {self.total_events_received}, last event: {self.last_event_type}. "
f"Error: %s, trace: %s",
e,
traceback.format_exc(),
)
async for message in self._process_event(event, ttft_span, prev_message_type, message_index):
new_message_type = message.message_type
if new_message_type != prev_message_type:
@@ -1033,7 +1120,13 @@ class SimpleOpenAIResponsesStreamingInterface:
yield LettaStopReason(stop_reason=StopReasonType.error)
raise e
finally:
logger.info("OpenAIStreamingInterface: Stream processing complete.")
logger.info(
f"ResponsesAPI Stream processing complete. "
f"Received {self.total_events_received} events, "
f"last event: {self.last_event_type}, "
f"has final_response: {self.final_response is not None}, "
f"stream was cancelled: {self.stream_was_cancelled}"
)
async def _process_event(
self,
@@ -1042,6 +1135,9 @@ class SimpleOpenAIResponsesStreamingInterface:
prev_message_type: Optional[str] = None,
message_index: int = 0,
) -> AsyncGenerator[LettaMessage | LettaStopReason, None]:
# Track events for diagnostics
self.total_events_received += 1
self.last_event_type = type(event).__name__
if isinstance(event, ResponseCreatedEvent):
# No-op, just had the input events
return
@@ -1338,7 +1434,11 @@ class SimpleOpenAIResponsesStreamingInterface:
return
else:
logger.debug(f"Unhandled event: {event}")
event_type = type(event).__name__
logger.warning(f"Unhandled event type: {event_type}. Event details: {event if hasattr(event, '__dict__') else str(event)}")
# Check if this is an error event we should handle
if hasattr(event, "error") and event.error is not None:
logger.error(f"Stream error event received: {event.error}")
return