From e8d5922ff9a94922498fcbd59c54751b5711910b Mon Sep 17 00:00:00 2001 From: jnjpng Date: Wed, 18 Feb 2026 15:44:22 -0800 Subject: [PATCH] fix(core): handle ResponseIncompleteEvent in OpenAI Responses API streaming (#9535) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(core): handle ResponseIncompleteEvent in OpenAI Responses API streaming When reasoning models (gpt-5.x) exhaust their max_output_tokens budget on chain-of-thought reasoning, OpenAI emits a ResponseIncompleteEvent instead of ResponseCompletedEvent. This was previously unhandled, causing final_response to remain None — which meant get_content() and get_tool_call_objects() returned empty results, silently dropping the partial response. Now ResponseIncompleteEvent is handled identically to ResponseCompletedEvent (extracting partial content, usage stats, and token details), with an additional warning log indicating the incomplete reason. * fix(core): propagate finish_reason for Responses API incomplete events - Guard usage extraction against None usage payload in ResponseIncompleteEvent handler - Add _finish_reason override to LettaLLMAdapter so streaming adapters can explicitly set finish_reason without a chat_completions_response - Map incomplete_details.reason="max_output_tokens" to finish_reason="length" in SimpleLLMStreamAdapter, matching the Chat Completions API convention - This allows the agent loop's _decide_continuation to correctly return stop_reason="max_tokens_exceeded" instead of "end_turn" when the model exhausts its output token budget on reasoning * fix(core): handle empty content parts in incomplete ResponseOutputMessage When a model hits max_output_tokens after starting a ResponseOutputMessage but before producing any content parts, the message has content=[]. This previously raised ValueError("Got 0 content parts, expected 1"). Now it logs a warning and skips the empty message, allowing reasoning-only incomplete responses to be processed cleanly. * fix(core): map all incomplete reasons to finish_reason, not just max_output_tokens Handle content_filter and any future unknown incomplete reasons from the Responses API instead of silently leaving finish_reason as None. --- letta/adapters/letta_llm_adapter.py | 3 + letta/adapters/simple_llm_stream_adapter.py | 16 ++++ .../interfaces/openai_streaming_interface.py | 76 +++++++++++++------ 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/letta/adapters/letta_llm_adapter.py b/letta/adapters/letta_llm_adapter.py index d75c2367..49e99c49 100644 --- a/letta/adapters/letta_llm_adapter.py +++ b/letta/adapters/letta_llm_adapter.py @@ -55,6 +55,7 @@ class LettaLLMAdapter(ABC): self.usage: LettaUsageStatistics = LettaUsageStatistics() self.telemetry_manager: TelemetryManager = TelemetryManager() self.llm_request_finish_timestamp_ns: int | None = None + self._finish_reason: str | None = None @abstractmethod async def invoke_llm( @@ -92,6 +93,8 @@ class LettaLLMAdapter(ABC): Returns: str | None: The finish_reason if available, None otherwise """ + if self._finish_reason is not None: + return self._finish_reason if self.chat_completions_response and self.chat_completions_response.choices: return self.chat_completions_response.choices[0].finish_reason return None diff --git a/letta/adapters/simple_llm_stream_adapter.py b/letta/adapters/simple_llm_stream_adapter.py index 492260d8..26c054fd 100644 --- a/letta/adapters/simple_llm_stream_adapter.py +++ b/letta/adapters/simple_llm_stream_adapter.py @@ -198,6 +198,22 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): # Store any additional data from the interface self.message_id = self.interface.letta_message_id + # Populate finish_reason for downstream continuation logic. + # In Responses streaming, max_output_tokens is expressed via incomplete_details.reason. + if hasattr(self.interface, "final_response") and self.interface.final_response is not None: + resp = self.interface.final_response + incomplete_details = getattr(resp, "incomplete_details", None) + incomplete_reason = getattr(incomplete_details, "reason", None) if incomplete_details else None + if incomplete_reason == "max_output_tokens": + self._finish_reason = "length" + elif incomplete_reason == "content_filter": + self._finish_reason = "content_filter" + elif incomplete_reason is not None: + # Unknown incomplete reason — preserve it as-is for diagnostics + self._finish_reason = incomplete_reason + elif getattr(resp, "status", None) == "completed": + self._finish_reason = "stop" + # Log request and response data self.log_provider_trace(step_id=step_id, actor=actor) diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 0e18f57d..8823513e 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -19,6 +19,7 @@ from openai.types.responses import ( ResponseFunctionCallArgumentsDeltaEvent, ResponseFunctionCallArgumentsDoneEvent, ResponseFunctionToolCall, + ResponseIncompleteEvent, ResponseInProgressEvent, ResponseOutputItemAddedEvent, ResponseOutputItemDoneEvent, @@ -1021,6 +1022,9 @@ class SimpleOpenAIResponsesStreamingInterface: self.last_event_type: str | None = None self.total_events_received: int = 0 self.stream_was_cancelled: bool = False + # For downstream finish_reason mapping (e.g. max_output_tokens -> "length") + # None means no incomplete reason was observed. + self.incomplete_reason: str | None = None # -------- Mapping helpers (no broad try/except) -------- def _record_tool_mapping(self, event: object, item: object) -> tuple[str | None, str | None, int | None, str | None]: @@ -1081,6 +1085,10 @@ class SimpleOpenAIResponsesStreamingInterface: text=response.content[0].text, ) ) + elif len(response.content) == 0: + # Incomplete responses may have an output message with no content parts + # (model started the message item but hit max_output_tokens before producing text) + logger.warning("ResponseOutputMessage has 0 content parts (likely from an incomplete response), skipping.") else: raise ValueError(f"Got {len(response.content)} content parts, expected 1") @@ -1487,31 +1495,55 @@ class SimpleOpenAIResponsesStreamingInterface: return # Generic finish - elif isinstance(event, ResponseCompletedEvent): - # NOTE we can "rebuild" the final state of the stream using the values in here, instead of relying on the accumulators + elif isinstance(event, (ResponseCompletedEvent, ResponseIncompleteEvent)): + # ResponseIncompleteEvent has the same response structure as ResponseCompletedEvent, + # but indicates the response was cut short (e.g. due to max_output_tokens). + # We still extract the partial response and usage data so they aren't silently lost. + if isinstance(event, ResponseIncompleteEvent): + self.incomplete_reason = ( + getattr(event.response.incomplete_details, "reason", None) if event.response.incomplete_details else None + ) + reason = self.incomplete_reason or "unknown" + logger.warning( + f"OpenAI Responses API returned an incomplete response (reason: {reason}). " + f"Model: {event.response.model}, output_tokens: {event.response.usage.output_tokens if event.response.usage else 'N/A'}. " + f"The partial response content will still be used." + ) + self.final_response = event.response self.model = event.response.model - self.input_tokens = event.response.usage.input_tokens - self.output_tokens = event.response.usage.output_tokens self.message_id = event.response.id - # Store raw usage for transparent provider trace logging - try: - self.raw_usage = event.response.usage.model_dump(exclude_none=True) - except Exception as e: - logger.error(f"Failed to capture raw_usage from OpenAI Responses API: {e}") - self.raw_usage = None - # Capture cache token details (Responses API uses input_tokens_details) - # Use `is not None` to capture 0 values (meaning "provider reported 0 cached tokens") - if hasattr(event.response.usage, "input_tokens_details") and event.response.usage.input_tokens_details: - details = event.response.usage.input_tokens_details - if hasattr(details, "cached_tokens") and details.cached_tokens is not None: - self.cached_tokens = details.cached_tokens - # Capture reasoning token details (Responses API uses output_tokens_details) - # Use `is not None` to capture 0 values (meaning "provider reported 0 reasoning tokens") - if hasattr(event.response.usage, "output_tokens_details") and event.response.usage.output_tokens_details: - details = event.response.usage.output_tokens_details - if hasattr(details, "reasoning_tokens") and details.reasoning_tokens is not None: - self.reasoning_tokens = details.reasoning_tokens + + usage = event.response.usage + if usage is not None: + self.input_tokens = usage.input_tokens + self.output_tokens = usage.output_tokens + + # Store raw usage for transparent provider trace logging + try: + self.raw_usage = usage.model_dump(exclude_none=True) + except Exception as e: + logger.error(f"Failed to capture raw_usage from OpenAI Responses API: {e}") + self.raw_usage = None + + # Capture cache token details (Responses API uses input_tokens_details) + # Use `is not None` to capture 0 values (meaning "provider reported 0 cached tokens") + if hasattr(usage, "input_tokens_details") and usage.input_tokens_details: + details = usage.input_tokens_details + if hasattr(details, "cached_tokens") and details.cached_tokens is not None: + self.cached_tokens = details.cached_tokens + + # Capture reasoning token details (Responses API uses output_tokens_details) + # Use `is not None` to capture 0 values (meaning "provider reported 0 reasoning tokens") + if hasattr(usage, "output_tokens_details") and usage.output_tokens_details: + details = usage.output_tokens_details + if hasattr(details, "reasoning_tokens") and details.reasoning_tokens is not None: + self.reasoning_tokens = details.reasoning_tokens + else: + logger.warning( + "OpenAI Responses API finish event had no usage payload. " + "Proceeding with partial response but token metrics may be incomplete." + ) return else: