diff --git a/letta/server/rest_api/interface.py b/letta/server/rest_api/interface.py index 84c23d25..b76c78a3 100644 --- a/letta/server/rest_api/interface.py +++ b/letta/server/rest_api/interface.py @@ -295,6 +295,25 @@ class StreamingServerInterface(AgentChunkStreamingInterface): self.optimistic_json_parser = OptimisticJSONParser() self.current_json_parse_result = {} + # NOTE (fix): OpenAI deltas may split a key and its value across chunks + # (e.g. '"request_heartbeat"' in one chunk, ': true' in the next). The + # old behavior passed through each fragment verbatim, which could emit + # a bare key (or a key+opening quote) without its value, producing + # invalid JSON slices and the "missing end-quote" symptom downstream. + # + # To make streamed arguments robust, we add a JSON-aware incremental + # reader that only releases safe updates for the "main" JSON portion of + # the tool_call arguments. This prevents partial-key emissions while + # preserving incremental streaming for consumers. + # + # We still stream 'name' fragments as-is (safe), but 'arguments' are + # parsed incrementally and emitted only when a boundary is safe. + self._raw_args_reader = JSONInnerThoughtsExtractor( + inner_thoughts_key=inner_thoughts_kwarg, + wait_for_first_key=False, + ) + self._raw_args_tool_call_id = None + # Store metadata passed from server self.metadata = {} @@ -654,11 +673,24 @@ class StreamingServerInterface(AgentChunkStreamingInterface): tool_call_delta = {} if tool_call.id: tool_call_delta["id"] = tool_call.id + # Reset raw args reader per tool_call id + if self._raw_args_tool_call_id != tool_call.id: + self._raw_args_tool_call_id = tool_call.id + self._raw_args_reader = JSONInnerThoughtsExtractor( + inner_thoughts_key=self.inner_thoughts_kwarg, + wait_for_first_key=False, + ) if tool_call.function: - if tool_call.function.arguments: - tool_call_delta["arguments"] = tool_call.function.arguments + # Stream name fragments as-is (names are short and harmless to emit) if tool_call.function.name: tool_call_delta["name"] = tool_call.function.name + # For arguments, incrementally parse to avoid emitting partial keys + if tool_call.function.arguments: + self.current_function_arguments += tool_call.function.arguments + updates_main_json, _ = self._raw_args_reader.process_fragment(tool_call.function.arguments) + # Only emit argument updates when a safe boundary is reached + if updates_main_json: + tool_call_delta["arguments"] = updates_main_json # We might end up with a no-op, in which case we should omit if ( diff --git a/letta/streaming_utils.py b/letta/streaming_utils.py index f1b84f2f..a34f4577 100644 --- a/letta/streaming_utils.py +++ b/letta/streaming_utils.py @@ -99,6 +99,15 @@ class JSONInnerThoughtsExtractor: else: updates_main_json += c self.main_buffer += c + # NOTE (fix): Streaming JSON can arrive token-by-token from the LLM. + # In the old implementation we pre-inserted an opening quote after every + # key's colon (i.e. we emitted '"key":"' immediately). That implicitly + # assumed all values are strings. When a non-string value (e.g. true/false, + # numbers, null, or a nested object/array) streamed in next, the stream + # ended up with an unmatched '"' and appeared as a "missing end-quote" to + # clients. We now only emit an opening quote when we actually enter a + # string value (see below). This keeps values like booleans unquoted and + # avoids generating dangling quotes mid-stream. elif c == '"': if not self.escaped: self.in_string = not self.in_string @@ -112,6 +121,14 @@ class JSONInnerThoughtsExtractor: self.main_buffer += self.main_json_held_buffer self.main_json_held_buffer = "" self.hold_main_json = False + elif self.state == "value": + # Opening quote for a string value (non-inner-thoughts only) + if not self.is_inner_thoughts_value: + if self.hold_main_json: + self.main_json_held_buffer += '"' + else: + updates_main_json += '"' + self.main_buffer += '"' else: if self.state == "key": self.state = "colon" @@ -156,18 +173,26 @@ class JSONInnerThoughtsExtractor: updates_main_json += c self.main_buffer += c else: + # NOTE (fix): Do NOT pre-insert an opening quote after ':' any more. + # The value may not be a string; we only emit quotes when we actually + # see a string begin (handled in the '"' branch above). This prevents + # forced-quoting of non-string values and eliminates the common + # streaming artifact of "... 'request_heartbeat':'true}" missing the + # final quote. if c == ":" and self.state == "colon": + # Transition to reading a value; don't pre-insert quotes self.state = "value" self.is_inner_thoughts_value = self.current_key == self.inner_thoughts_key if self.is_inner_thoughts_value: - pass # Do not include 'inner_thoughts' key in main_json + # Do not include 'inner_thoughts' key in main_json + pass else: key_colon = f'"{self.current_key}":' if self.hold_main_json: - self.main_json_held_buffer += key_colon + '"' + self.main_json_held_buffer += key_colon else: - updates_main_json += key_colon + '"' - self.main_buffer += key_colon + '"' + updates_main_json += key_colon + self.main_buffer += key_colon elif c == "," and self.state == "comma_or_end": if self.is_inner_thoughts_value: # Inner thoughts value ended