diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 32fa9933..51754c8b 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -147,6 +147,11 @@ class OpenAIStreamingInterface: def _get_function_id_buffer(self) -> str | None: return "".join(self._function_id_parts) if self._function_id_parts else None + def _get_current_function_id(self) -> str | None: + """Prefer the last flushed ID when the live buffer is empty. + Ensures tool_call_id is present on subsequent argument deltas after name/id flush.""" + return self.last_flushed_function_id if self.last_flushed_function_id else self._get_function_id_buffer() + def _clear_function_buffers(self) -> None: self._function_name_parts = [] self._function_id_parts = [] @@ -349,7 +354,7 @@ class OpenAIStreamingInterface: tool_call=ToolCallDelta( name=self._get_function_name_buffer(), arguments=None, - tool_call_id=self._get_function_id_buffer(), + tool_call_id=self._get_current_function_id(), ), otid=Message.generate_otid_from_id(decrement_message_uuid(self.letta_message_id), -1), run_id=self.run_id, @@ -359,7 +364,7 @@ class OpenAIStreamingInterface: tool_call_delta = ToolCallDelta( name=self._get_function_name_buffer(), arguments=None, - tool_call_id=self._get_function_id_buffer(), + tool_call_id=self._get_current_function_id(), ) tool_call_msg = ToolCallMessage( id=self.letta_message_id, @@ -375,8 +380,8 @@ class OpenAIStreamingInterface: # Record what the last function name we flushed was self.last_flushed_function_name = self._get_function_name_buffer() - if self.last_flushed_function_id is None: - self.last_flushed_function_id = self._get_function_id_buffer() + # Always refresh flushed id to current buffer for this tool call + self.last_flushed_function_id = self._get_function_id_buffer() # Clear the buffer self._clear_function_buffers() # Since we're clearing the name buffer, we should store @@ -436,7 +441,7 @@ class OpenAIStreamingInterface: tool_call=ToolCallDelta( name=self._get_function_name_buffer(), arguments=combined_chunk, - tool_call_id=self._get_function_id_buffer(), + tool_call_id=self._get_current_function_id(), ), # name=name, otid=Message.generate_otid_from_id(decrement_message_uuid(self.letta_message_id), -1), @@ -447,7 +452,7 @@ class OpenAIStreamingInterface: tool_call_delta = ToolCallDelta( name=self._get_function_name_buffer(), arguments=combined_chunk, - tool_call_id=self._get_function_id_buffer(), + tool_call_id=self._get_current_function_id(), ) tool_call_msg = ToolCallMessage( id=self.letta_message_id, @@ -475,7 +480,7 @@ class OpenAIStreamingInterface: tool_call=ToolCallDelta( name=None, arguments=updates_main_json, - tool_call_id=self._get_function_id_buffer(), + tool_call_id=self._get_current_function_id(), ), # name=name, otid=Message.generate_otid_from_id(decrement_message_uuid(self.letta_message_id), -1), @@ -486,7 +491,7 @@ class OpenAIStreamingInterface: tool_call_delta = ToolCallDelta( name=None, arguments=updates_main_json, - tool_call_id=self._get_function_id_buffer(), + tool_call_id=self._get_current_function_id(), ) tool_call_msg = ToolCallMessage( id=self.letta_message_id, @@ -769,10 +774,16 @@ class SimpleOpenAIStreamingInterface: if tool_call.id: acc["id_parts"].append(tool_call.id) + # Resolve stable id from accumulator; OpenAI may omit id on argument-only deltas + resolved_id = "".join(acc.get("id_parts", [])) if acc.get("id_parts") else None + # If we don't yet have an id for this tool_call index, skip emitting unusable delta + if resolved_id is None: + continue + delta = ToolCallDelta( name=tool_call.function.name if (tool_call.function and tool_call.function.name) else None, arguments=tool_call.function.arguments if (tool_call.function and tool_call.function.arguments) else None, - tool_call_id=tool_call.id if tool_call.id else None, + tool_call_id=resolved_id, ) _curr_name = "".join(acc.get("name_parts", [])) if "name_parts" in acc else acc.get("name", "") @@ -822,6 +833,9 @@ class SimpleOpenAIResponsesStreamingInterface: self.requires_approval_tools = requires_approval_tools # We need to store the name for approvals self.tool_call_name = None + # Responses API parallel tool call tracking: map output_index/item_id -> (call_id, name) + self._tool_map_by_output_index: dict[int, tuple[str | None, str | None]] = {} + self._tool_map_by_item_id: dict[str, tuple[str | None, str | None]] = {} # ID responses used self.message_id = None self.run_id = run_id @@ -832,6 +846,34 @@ class SimpleOpenAIResponsesStreamingInterface: self.model = model self.final_response: Optional[ParsedResponse] = None + # -------- Mapping helpers (no broad try/except) -------- + def _record_tool_mapping(self, event: object, item: object) -> tuple[str | None, str | None, int | None, str | None]: + """Record call_id/name mapping for this tool-call using output_index and item.id if present. + Returns (call_id, name, output_index, item_id).""" + call_id = getattr(item, "call_id", None) + name = getattr(item, "name", None) + output_index = getattr(event, "output_index", None) + item_id = getattr(item, "id", None) + if isinstance(output_index, int): + self._tool_map_by_output_index[output_index] = (call_id, name) + if isinstance(item_id, str) and item_id: + self._tool_map_by_item_id[item_id] = (call_id, name) + return call_id, name, output_index if isinstance(output_index, int) else None, item_id if isinstance(item_id, str) else None + + def _resolve_mapping_for_delta(self, event: object) -> tuple[str | None, str | None, int | None, str | None]: + """Resolve (call_id, name) for an arguments-delta event. Returns mapping plus keys used.""" + output_index = getattr(event, "output_index", None) + if isinstance(output_index, int) and output_index in self._tool_map_by_output_index: + call_id, name = self._tool_map_by_output_index[output_index] + return call_id, name, output_index, None + item_id = getattr(event, "item_id", None) + if isinstance(item_id, str) and item_id in self._tool_map_by_item_id: + call_id, name = self._tool_map_by_item_id[item_id] + return call_id, name, None, item_id + return None, None, output_index if isinstance(output_index, int) else None, item_id if isinstance(item_id, str) else None + + # (No buffering: we rely on Responses event order — tool_call added before arg deltas.) + def get_content(self) -> list[TextContent | SummarizedReasoningContent]: """This includes both SummarizedReasoningContent and TextContent""" if self.final_response is None: @@ -996,6 +1038,8 @@ class SimpleOpenAIResponsesStreamingInterface: arguments = new_event_item.arguments # cache for approval if/elses self.tool_call_name = name + # Record mapping so subsequent argument deltas can be associated + self._record_tool_mapping(event, new_event_item) if self.tool_call_name and self.tool_call_name in self.requires_approval_tools: yield ApprovalRequestMessage( id=decrement_message_uuid(self.letta_message_id), @@ -1163,7 +1207,19 @@ class SimpleOpenAIResponsesStreamingInterface: # only includes delta on args delta = event.delta - if self.tool_call_name and self.tool_call_name in self.requires_approval_tools: + # Resolve tool_call_id/name using output_index or item_id + resolved_call_id, resolved_name, out_idx, item_id = self._resolve_mapping_for_delta(event) + + # Fallback to last seen tool name for approval routing if mapping name missing + if not resolved_name: + resolved_name = self.tool_call_name + + if resolved_call_id is None: + # Mapping not yet available (unexpected); skip emitting unusable delta + return + + # We have a call id; emit approval or tool-call message accordingly + if resolved_name and resolved_name in self.requires_approval_tools: yield ApprovalRequestMessage( id=decrement_message_uuid(self.letta_message_id), otid=Message.generate_otid_from_id(decrement_message_uuid(self.letta_message_id), -1), @@ -1171,7 +1227,7 @@ class SimpleOpenAIResponsesStreamingInterface: tool_call=ToolCallDelta( name=None, arguments=delta, - tool_call_id=None, + tool_call_id=resolved_call_id, ), run_id=self.run_id, step_id=self.step_id, @@ -1182,7 +1238,7 @@ class SimpleOpenAIResponsesStreamingInterface: tool_call_delta = ToolCallDelta( name=None, arguments=delta, - tool_call_id=None, + tool_call_id=resolved_call_id, ) yield ToolCallMessage( id=self.letta_message_id,