From 6f57ae829a1a9be55359a1b668a12f4895eb50a2 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Tue, 4 Nov 2025 18:07:46 -0800 Subject: [PATCH] fix: Reduce string growth for anthropic (#5974) Reduce string growth for anthropic --- ..._parallel_tool_call_streaming_interface.py | 30 ++++++++---- .../anthropic_streaming_interface.py | 48 +++++++++++-------- 2 files changed, 48 insertions(+), 30 deletions(-) diff --git a/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py b/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py index 370f900f..6a819e42 100644 --- a/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py +++ b/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py @@ -111,8 +111,8 @@ class SimpleAnthropicStreamingInterface: # Collected finalized tool calls (supports parallel tool use) self.collected_tool_calls: list[ToolCall] = [] # Track active tool_use blocks by stream index for parallel tool calling - # { index: {"id": str, "name": str, "args": str} } - self.active_tool_uses: dict[int, dict[str, str]] = {} + # { index: {"id": str, "name": str, "args_parts": list[str]} } + self.active_tool_uses: dict[int, dict[str, object]] = {} # Maintain start order and indexed collection for stable ordering self._tool_use_start_order: list[int] = [] self._collected_indexed: list[tuple[int, ToolCall]] = [] @@ -154,13 +154,13 @@ class SimpleAnthropicStreamingInterface: redacted_text = "".join(chunk.hidden_reasoning for chunk in group if chunk.hidden_reasoning is not None) return RedactedReasoningContent(data=redacted_text) elif group_type == "text": - concat = "" + parts: list[str] = [] for chunk in group: if isinstance(chunk.content, list): - concat += "".join([c.text for c in chunk.content]) + parts.append("".join([c.text for c in chunk.content])) else: - concat += chunk.content - return TextContent(text=concat) + parts.append(chunk.content) + return TextContent(text="".join(parts)) else: raise ValueError("Unexpected group type") @@ -274,7 +274,7 @@ class SimpleAnthropicStreamingInterface: elif isinstance(content, BetaToolUseBlock): # New tool_use block started at this index self.anthropic_mode = EventMode.TOOL_USE - self.active_tool_uses[event.index] = {"id": content.id, "name": content.name, "args": ""} + self.active_tool_uses[event.index] = {"id": content.id, "name": content.name, "args_parts": []} if event.index not in self._tool_use_start_order: self._tool_use_start_order.append(event.index) @@ -366,12 +366,21 @@ class SimpleAnthropicStreamingInterface: ctx = self.active_tool_uses.get(event.index) if ctx is None: # Defensive: initialize if missing - self.active_tool_uses[event.index] = {"id": self.tool_call_id or "", "name": self.tool_call_name or "", "args": ""} + self.active_tool_uses[event.index] = { + "id": self.tool_call_id or "", + "name": self.tool_call_name or "", + "args_parts": [], + } ctx = self.active_tool_uses[event.index] # Append only non-empty partials if delta.partial_json: - ctx["args"] += delta.partial_json + # Append fragment to args_parts to avoid O(n^2) string growth + args_parts = ctx.get("args_parts") if isinstance(ctx.get("args_parts"), list) else None + if args_parts is None: + args_parts = [] + ctx["args_parts"] = args_parts + args_parts.append(delta.partial_json) else: # Skip streaming a no-op delta to prevent duplicate placeholders in UI return @@ -465,7 +474,8 @@ class SimpleAnthropicStreamingInterface: # Finalize the tool_use block at this index using accumulated deltas ctx = self.active_tool_uses.pop(event.index, None) if ctx is not None and ctx.get("id") and ctx.get("name") is not None: - raw_args = ctx.get("args", "") + parts = ctx.get("args_parts") if isinstance(ctx.get("args_parts"), list) else None + raw_args = "".join(parts) if parts else "" try: # Prefer strict JSON load, fallback to permissive parser tool_input = json.loads(raw_args) if raw_args else {} diff --git a/letta/interfaces/anthropic_streaming_interface.py b/letta/interfaces/anthropic_streaming_interface.py index 708eaa17..970f9e7a 100644 --- a/letta/interfaces/anthropic_streaming_interface.py +++ b/letta/interfaces/anthropic_streaming_interface.py @@ -81,7 +81,8 @@ class AnthropicStreamingInterface: self.accumulated_inner_thoughts = [] self.tool_call_id = None self.tool_call_name = None - self.accumulated_tool_call_args = "" + # Accumulate tool-call args as parts to avoid O(n^2) + self._accumulated_tool_call_args_parts: list[str] = [] self.previous_parse = {} # usage trackers @@ -108,15 +109,16 @@ class AnthropicStreamingInterface: raise ValueError("No tool call returned") # hack for tool rules try: - tool_input = json.loads(self.accumulated_tool_call_args) + args_str = "".join(self._accumulated_tool_call_args_parts) + tool_input = json.loads(args_str) except json.JSONDecodeError as e: # Attempt to use OptimisticJSONParser to handle incomplete/malformed JSON try: - tool_input = self.json_parser.parse(self.accumulated_tool_call_args) + tool_input = self.json_parser.parse(args_str) except: logger.warning( f"Failed to decode tool call arguments for tool_call_id={self.tool_call_id}, " - f"name={self.tool_call_name}. Raw input: {self.accumulated_tool_call_args!r}. Error: {e}" + f"name={self.tool_call_name}. Raw input: {args_str!r}. Error: {e}" ) raise e if "id" in tool_input and tool_input["id"].startswith("toolu_") and "function" in tool_input: @@ -361,8 +363,9 @@ class AnthropicStreamingInterface: f"Streaming integrity failed - received BetaInputJSONDelta object while not in TOOL_USE EventMode: {delta}" ) - self.accumulated_tool_call_args += delta.partial_json - current_parsed = self.json_parser.parse(self.accumulated_tool_call_args) + if delta.partial_json: + self._accumulated_tool_call_args_parts.append(delta.partial_json) + current_parsed = self.json_parser.parse("".join(self._accumulated_tool_call_args_parts)) # Start detecting a difference in inner thoughts previous_inner_thoughts = self.previous_parse.get(INNER_THOUGHTS_KWARG, "") @@ -385,7 +388,9 @@ class AnthropicStreamingInterface: yield reasoning_message # Check if inner thoughts are complete - if so, flush the buffer or create approval message - if not self.inner_thoughts_complete and self._check_inner_thoughts_complete(self.accumulated_tool_call_args): + if not self.inner_thoughts_complete and self._check_inner_thoughts_complete( + "".join(self._accumulated_tool_call_args_parts) + ): self.inner_thoughts_complete = True # Check if this tool requires approval @@ -395,7 +400,7 @@ class AnthropicStreamingInterface: message_index += 1 # Strip out inner thoughts from arguments - tool_call_args = self.accumulated_tool_call_args + tool_call_args = "".join(self._accumulated_tool_call_args_parts) if current_inner_thoughts: tool_call_args = tool_call_args.replace(f'"{INNER_THOUGHTS_KWARG}": "{current_inner_thoughts}"', "") @@ -419,9 +424,10 @@ class AnthropicStreamingInterface: message_index += 1 # Strip out the inner thoughts from the buffered tool call arguments before streaming - tool_call_args = "" - for buffered_msg in self.tool_call_buffer: - tool_call_args += buffered_msg.tool_call.arguments if buffered_msg.tool_call.arguments else "" + parts = [ + buffered_msg.tool_call.arguments for buffered_msg in self.tool_call_buffer if buffered_msg.tool_call.arguments + ] + tool_call_args = "".join(parts) tool_call_args = tool_call_args.replace(f'"{INNER_THOUGHTS_KWARG}": "{current_inner_thoughts}"', "") tool_call_delta = ToolCallDelta( @@ -578,7 +584,7 @@ class SimpleAnthropicStreamingInterface: self.accumulated_inner_thoughts = [] self.tool_call_id = None self.tool_call_name = None - self.accumulated_tool_call_args = "" + self._accumulated_tool_call_args_parts: list[str] = [] self.previous_parse = {} # usage trackers @@ -608,15 +614,16 @@ class SimpleAnthropicStreamingInterface: # hack for tool rules try: - tool_input = json.loads(self.accumulated_tool_call_args) + args_str = "".join(self._accumulated_tool_call_args_parts) + tool_input = json.loads(args_str) except json.JSONDecodeError as e: # Attempt to use OptimisticJSONParser to handle incomplete/malformed JSON try: - tool_input = self.json_parser.parse(self.accumulated_tool_call_args) + tool_input = self.json_parser.parse(args_str) except: logger.warning( f"Failed to decode tool call arguments for tool_call_id={self.tool_call_id}, " - f"name={self.tool_call_name}. Raw input: {self.accumulated_tool_call_args!r}. Error: {e}" + f"name={self.tool_call_name}. Raw input: {args_str!r}. Error: {e}" ) raise e if "id" in tool_input and tool_input["id"].startswith("toolu_") and "function" in tool_input: @@ -642,13 +649,13 @@ class SimpleAnthropicStreamingInterface: redacted_text = "".join(chunk.hidden_reasoning for chunk in group if chunk.hidden_reasoning is not None) return RedactedReasoningContent(data=redacted_text) elif group_type == "text": - concat = "" + parts: list[str] = [] for chunk in group: if isinstance(chunk.content, list): - concat += "".join([c.text for c in chunk.content]) + parts.append("".join([c.text for c in chunk.content])) else: - concat += chunk.content - return TextContent(text=concat) + parts.append(chunk.content) + return TextContent(text="".join(parts)) else: raise ValueError("Unexpected group type") @@ -852,7 +859,8 @@ class SimpleAnthropicStreamingInterface: f"Streaming integrity failed - received BetaInputJSONDelta object while not in TOOL_USE EventMode: {delta}" ) - self.accumulated_tool_call_args += delta.partial_json + if delta.partial_json: + self._accumulated_tool_call_args_parts.append(delta.partial_json) if self.tool_call_name in self.requires_approval_tools: if prev_message_type and prev_message_type != "approval_request_message":