fix: user messages on new agent loop are not processed in ADE (includes new json parser) (#1934)
This commit is contained in:
@@ -35,7 +35,7 @@ from letta.schemas.letta_message import (
|
||||
from letta.schemas.letta_message_content import ReasoningContent, RedactedReasoningContent, TextContent
|
||||
from letta.schemas.message import Message
|
||||
from letta.schemas.openai.chat_completion_response import FunctionCall, ToolCall
|
||||
from letta.server.rest_api.optimistic_json_parser import OptimisticJSONParser
|
||||
from letta.server.rest_api.json_parser import JSONParser, PydanticJSONParser
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -56,7 +56,7 @@ class AnthropicStreamingInterface:
|
||||
"""
|
||||
|
||||
def __init__(self, use_assistant_message: bool = False, put_inner_thoughts_in_kwarg: bool = False):
|
||||
self.optimistic_json_parser: OptimisticJSONParser = OptimisticJSONParser()
|
||||
self.json_parser: JSONParser = PydanticJSONParser()
|
||||
self.use_assistant_message = use_assistant_message
|
||||
|
||||
# Premake IDs for database writes
|
||||
@@ -68,7 +68,7 @@ class AnthropicStreamingInterface:
|
||||
self.accumulated_inner_thoughts = []
|
||||
self.tool_call_id = None
|
||||
self.tool_call_name = None
|
||||
self.accumulated_tool_call_args = []
|
||||
self.accumulated_tool_call_args = ""
|
||||
self.previous_parse = {}
|
||||
|
||||
# usage trackers
|
||||
@@ -85,193 +85,200 @@ class AnthropicStreamingInterface:
|
||||
|
||||
def get_tool_call_object(self) -> ToolCall:
|
||||
"""Useful for agent loop"""
|
||||
return ToolCall(
|
||||
id=self.tool_call_id, function=FunctionCall(arguments="".join(self.accumulated_tool_call_args), name=self.tool_call_name)
|
||||
)
|
||||
return ToolCall(id=self.tool_call_id, function=FunctionCall(arguments=self.accumulated_tool_call_args, name=self.tool_call_name))
|
||||
|
||||
def _check_inner_thoughts_complete(self, combined_args: str) -> bool:
|
||||
"""
|
||||
Check if inner thoughts are complete in the current tool call arguments
|
||||
by looking for a closing quote after the inner_thoughts field
|
||||
"""
|
||||
if not self.put_inner_thoughts_in_kwarg:
|
||||
# None of the things should have inner thoughts in kwargs
|
||||
return True
|
||||
else:
|
||||
parsed = self.optimistic_json_parser.parse(combined_args)
|
||||
# TODO: This will break on tools with 0 input
|
||||
return len(parsed.keys()) > 1 and INNER_THOUGHTS_KWARG in parsed.keys()
|
||||
try:
|
||||
if not self.put_inner_thoughts_in_kwarg:
|
||||
# None of the things should have inner thoughts in kwargs
|
||||
return True
|
||||
else:
|
||||
parsed = self.json_parser.parse(combined_args)
|
||||
# TODO: This will break on tools with 0 input
|
||||
return len(parsed.keys()) > 1 and INNER_THOUGHTS_KWARG in parsed.keys()
|
||||
except Exception as e:
|
||||
logger.error("Error checking inner thoughts: %s", e)
|
||||
raise
|
||||
|
||||
async def process(self, stream: AsyncStream[BetaRawMessageStreamEvent]) -> AsyncGenerator[LettaMessage, None]:
|
||||
async with stream:
|
||||
async for event in stream:
|
||||
# TODO: Support BetaThinkingBlock, BetaRedactedThinkingBlock
|
||||
if isinstance(event, BetaRawContentBlockStartEvent):
|
||||
content = event.content_block
|
||||
try:
|
||||
async with stream:
|
||||
async for event in stream:
|
||||
# TODO: Support BetaThinkingBlock, BetaRedactedThinkingBlock
|
||||
if isinstance(event, BetaRawContentBlockStartEvent):
|
||||
content = event.content_block
|
||||
|
||||
if isinstance(content, BetaTextBlock):
|
||||
self.anthropic_mode = EventMode.TEXT
|
||||
# TODO: Can capture citations, etc.
|
||||
elif isinstance(content, BetaToolUseBlock):
|
||||
self.anthropic_mode = EventMode.TOOL_USE
|
||||
self.tool_call_id = content.id
|
||||
self.tool_call_name = content.name
|
||||
self.inner_thoughts_complete = False
|
||||
if isinstance(content, BetaTextBlock):
|
||||
self.anthropic_mode = EventMode.TEXT
|
||||
# TODO: Can capture citations, etc.
|
||||
elif isinstance(content, BetaToolUseBlock):
|
||||
self.anthropic_mode = EventMode.TOOL_USE
|
||||
self.tool_call_id = content.id
|
||||
self.tool_call_name = content.name
|
||||
self.inner_thoughts_complete = False
|
||||
|
||||
if not self.use_assistant_message:
|
||||
# Buffer the initial tool call message instead of yielding immediately
|
||||
tool_call_msg = ToolCallMessage(
|
||||
id=self.letta_tool_message_id,
|
||||
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id),
|
||||
if not self.use_assistant_message:
|
||||
# Buffer the initial tool call message instead of yielding immediately
|
||||
tool_call_msg = ToolCallMessage(
|
||||
id=self.letta_tool_message_id,
|
||||
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id),
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
self.tool_call_buffer.append(tool_call_msg)
|
||||
elif isinstance(content, BetaThinkingBlock):
|
||||
self.anthropic_mode = EventMode.THINKING
|
||||
# TODO: Can capture signature, etc.
|
||||
elif isinstance(content, BetaRedactedThinkingBlock):
|
||||
self.anthropic_mode = EventMode.REDACTED_THINKING
|
||||
|
||||
hidden_reasoning_message = HiddenReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
state="redacted",
|
||||
hidden_reasoning=content.data,
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
self.tool_call_buffer.append(tool_call_msg)
|
||||
elif isinstance(content, BetaThinkingBlock):
|
||||
self.anthropic_mode = EventMode.THINKING
|
||||
# TODO: Can capture signature, etc.
|
||||
elif isinstance(content, BetaRedactedThinkingBlock):
|
||||
self.anthropic_mode = EventMode.REDACTED_THINKING
|
||||
self.reasoning_messages.append(hidden_reasoning_message)
|
||||
yield hidden_reasoning_message
|
||||
|
||||
hidden_reasoning_message = HiddenReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
state="redacted",
|
||||
hidden_reasoning=content.data,
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
self.reasoning_messages.append(hidden_reasoning_message)
|
||||
yield hidden_reasoning_message
|
||||
elif isinstance(event, BetaRawContentBlockDeltaEvent):
|
||||
delta = event.delta
|
||||
|
||||
elif isinstance(event, BetaRawContentBlockDeltaEvent):
|
||||
delta = event.delta
|
||||
if isinstance(delta, BetaTextDelta):
|
||||
# Safety check
|
||||
if not self.anthropic_mode == EventMode.TEXT:
|
||||
raise RuntimeError(
|
||||
f"Streaming integrity failed - received BetaTextDelta object while not in TEXT EventMode: {delta}"
|
||||
)
|
||||
|
||||
if isinstance(delta, BetaTextDelta):
|
||||
# Safety check
|
||||
if not self.anthropic_mode == EventMode.TEXT:
|
||||
raise RuntimeError(
|
||||
f"Streaming integrity failed - received BetaTextDelta object while not in TEXT EventMode: {delta}"
|
||||
)
|
||||
# TODO: Strip out </thinking> more robustly, this is pretty hacky lol
|
||||
delta.text = delta.text.replace("</thinking>", "")
|
||||
self.accumulated_inner_thoughts.append(delta.text)
|
||||
|
||||
# TODO: Strip out </thinking> more robustly, this is pretty hacky lol
|
||||
delta.text = delta.text.replace("</thinking>", "")
|
||||
self.accumulated_inner_thoughts.append(delta.text)
|
||||
|
||||
reasoning_message = ReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
reasoning=self.accumulated_inner_thoughts[-1],
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
self.reasoning_messages.append(reasoning_message)
|
||||
yield reasoning_message
|
||||
|
||||
elif isinstance(delta, BetaInputJSONDelta):
|
||||
if not self.anthropic_mode == EventMode.TOOL_USE:
|
||||
raise RuntimeError(
|
||||
f"Streaming integrity failed - received BetaInputJSONDelta object while not in TOOL_USE EventMode: {delta}"
|
||||
)
|
||||
|
||||
self.accumulated_tool_call_args.append(delta.partial_json)
|
||||
combined_args = "".join(self.accumulated_tool_call_args)
|
||||
current_parsed = self.optimistic_json_parser.parse(combined_args)
|
||||
|
||||
# Start detecting a difference in inner thoughts
|
||||
previous_inner_thoughts = self.previous_parse.get(INNER_THOUGHTS_KWARG, "")
|
||||
current_inner_thoughts = current_parsed.get(INNER_THOUGHTS_KWARG, "")
|
||||
inner_thoughts_diff = current_inner_thoughts[len(previous_inner_thoughts) :]
|
||||
|
||||
if inner_thoughts_diff:
|
||||
reasoning_message = ReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
reasoning=inner_thoughts_diff,
|
||||
reasoning=self.accumulated_inner_thoughts[-1],
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
self.reasoning_messages.append(reasoning_message)
|
||||
yield reasoning_message
|
||||
|
||||
# Check if inner thoughts are complete - if so, flush the buffer
|
||||
if not self.inner_thoughts_complete and self._check_inner_thoughts_complete(combined_args):
|
||||
self.inner_thoughts_complete = True
|
||||
# Flush all buffered tool call messages
|
||||
elif isinstance(delta, BetaInputJSONDelta):
|
||||
if not self.anthropic_mode == EventMode.TOOL_USE:
|
||||
raise RuntimeError(
|
||||
f"Streaming integrity failed - received BetaInputJSONDelta object while not in TOOL_USE EventMode: {delta}"
|
||||
)
|
||||
|
||||
self.accumulated_tool_call_args += delta.partial_json
|
||||
current_parsed = self.json_parser.parse(self.accumulated_tool_call_args)
|
||||
|
||||
# Start detecting a difference in inner thoughts
|
||||
previous_inner_thoughts = self.previous_parse.get(INNER_THOUGHTS_KWARG, "")
|
||||
current_inner_thoughts = current_parsed.get(INNER_THOUGHTS_KWARG, "")
|
||||
inner_thoughts_diff = current_inner_thoughts[len(previous_inner_thoughts) :]
|
||||
|
||||
if inner_thoughts_diff:
|
||||
reasoning_message = ReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
reasoning=inner_thoughts_diff,
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
self.reasoning_messages.append(reasoning_message)
|
||||
yield reasoning_message
|
||||
|
||||
# Check if inner thoughts are complete - if so, flush the buffer
|
||||
if not self.inner_thoughts_complete and self._check_inner_thoughts_complete(self.accumulated_tool_call_args):
|
||||
self.inner_thoughts_complete = True
|
||||
# Flush all buffered tool call messages
|
||||
for buffered_msg in self.tool_call_buffer:
|
||||
yield buffered_msg
|
||||
self.tool_call_buffer = []
|
||||
|
||||
# Start detecting special case of "send_message"
|
||||
if self.tool_call_name == DEFAULT_MESSAGE_TOOL and self.use_assistant_message:
|
||||
previous_send_message = self.previous_parse.get(DEFAULT_MESSAGE_TOOL_KWARG, "")
|
||||
current_send_message = current_parsed.get(DEFAULT_MESSAGE_TOOL_KWARG, "")
|
||||
send_message_diff = current_send_message[len(previous_send_message) :]
|
||||
|
||||
# Only stream out if it's not an empty string
|
||||
if send_message_diff:
|
||||
yield AssistantMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
content=[TextContent(text=send_message_diff)],
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
else:
|
||||
# Otherwise, it is a normal tool call - buffer or yield based on inner thoughts status
|
||||
tool_call_msg = ToolCallMessage(
|
||||
id=self.letta_tool_message_id,
|
||||
tool_call=ToolCallDelta(arguments=delta.partial_json),
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
|
||||
if self.inner_thoughts_complete:
|
||||
yield tool_call_msg
|
||||
else:
|
||||
self.tool_call_buffer.append(tool_call_msg)
|
||||
|
||||
# Set previous parse
|
||||
self.previous_parse = current_parsed
|
||||
elif isinstance(delta, BetaThinkingDelta):
|
||||
# Safety check
|
||||
if not self.anthropic_mode == EventMode.THINKING:
|
||||
raise RuntimeError(
|
||||
f"Streaming integrity failed - received BetaThinkingBlock object while not in THINKING EventMode: {delta}"
|
||||
)
|
||||
|
||||
reasoning_message = ReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
source="reasoner_model",
|
||||
reasoning=delta.thinking,
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
self.reasoning_messages.append(reasoning_message)
|
||||
yield reasoning_message
|
||||
elif isinstance(delta, BetaSignatureDelta):
|
||||
# Safety check
|
||||
if not self.anthropic_mode == EventMode.THINKING:
|
||||
raise RuntimeError(
|
||||
f"Streaming integrity failed - received BetaSignatureDelta object while not in THINKING EventMode: {delta}"
|
||||
)
|
||||
|
||||
reasoning_message = ReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
source="reasoner_model",
|
||||
reasoning="",
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
signature=delta.signature,
|
||||
)
|
||||
self.reasoning_messages.append(reasoning_message)
|
||||
yield reasoning_message
|
||||
elif isinstance(event, BetaRawMessageStartEvent):
|
||||
self.message_id = event.message.id
|
||||
self.input_tokens += event.message.usage.input_tokens
|
||||
self.output_tokens += event.message.usage.output_tokens
|
||||
elif isinstance(event, BetaRawMessageDeltaEvent):
|
||||
self.output_tokens += event.usage.output_tokens
|
||||
elif isinstance(event, BetaRawMessageStopEvent):
|
||||
# Don't do anything here! We don't want to stop the stream.
|
||||
pass
|
||||
elif isinstance(event, BetaRawContentBlockStopEvent):
|
||||
# If we're exiting a tool use block and there are still buffered messages,
|
||||
# we should flush them now
|
||||
if self.anthropic_mode == EventMode.TOOL_USE and self.tool_call_buffer:
|
||||
for buffered_msg in self.tool_call_buffer:
|
||||
yield buffered_msg
|
||||
self.tool_call_buffer = []
|
||||
|
||||
# Start detecting special case of "send_message"
|
||||
if self.tool_call_name == DEFAULT_MESSAGE_TOOL and self.use_assistant_message:
|
||||
previous_send_message = self.previous_parse.get(DEFAULT_MESSAGE_TOOL_KWARG, "")
|
||||
current_send_message = current_parsed.get(DEFAULT_MESSAGE_TOOL_KWARG, "")
|
||||
send_message_diff = current_send_message[len(previous_send_message) :]
|
||||
|
||||
# Only stream out if it's not an empty string
|
||||
if send_message_diff:
|
||||
yield AssistantMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
content=[TextContent(text=send_message_diff)],
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
else:
|
||||
# Otherwise, it is a normal tool call - buffer or yield based on inner thoughts status
|
||||
tool_call_msg = ToolCallMessage(
|
||||
id=self.letta_tool_message_id,
|
||||
tool_call=ToolCallDelta(arguments=delta.partial_json),
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
|
||||
if self.inner_thoughts_complete:
|
||||
yield tool_call_msg
|
||||
else:
|
||||
self.tool_call_buffer.append(tool_call_msg)
|
||||
|
||||
# Set previous parse
|
||||
self.previous_parse = current_parsed
|
||||
elif isinstance(delta, BetaThinkingDelta):
|
||||
# Safety check
|
||||
if not self.anthropic_mode == EventMode.THINKING:
|
||||
raise RuntimeError(
|
||||
f"Streaming integrity failed - received BetaThinkingBlock object while not in THINKING EventMode: {delta}"
|
||||
)
|
||||
|
||||
reasoning_message = ReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
source="reasoner_model",
|
||||
reasoning=delta.thinking,
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
self.reasoning_messages.append(reasoning_message)
|
||||
yield reasoning_message
|
||||
elif isinstance(delta, BetaSignatureDelta):
|
||||
# Safety check
|
||||
if not self.anthropic_mode == EventMode.THINKING:
|
||||
raise RuntimeError(
|
||||
f"Streaming integrity failed - received BetaSignatureDelta object while not in THINKING EventMode: {delta}"
|
||||
)
|
||||
|
||||
reasoning_message = ReasoningMessage(
|
||||
id=self.letta_assistant_message_id,
|
||||
source="reasoner_model",
|
||||
reasoning="",
|
||||
date=datetime.now(timezone.utc).isoformat(),
|
||||
signature=delta.signature,
|
||||
)
|
||||
self.reasoning_messages.append(reasoning_message)
|
||||
yield reasoning_message
|
||||
elif isinstance(event, BetaRawMessageStartEvent):
|
||||
self.message_id = event.message.id
|
||||
self.input_tokens += event.message.usage.input_tokens
|
||||
self.output_tokens += event.message.usage.output_tokens
|
||||
elif isinstance(event, BetaRawMessageDeltaEvent):
|
||||
self.output_tokens += event.usage.output_tokens
|
||||
elif isinstance(event, BetaRawMessageStopEvent):
|
||||
# Don't do anything here! We don't want to stop the stream.
|
||||
pass
|
||||
elif isinstance(event, BetaRawContentBlockStopEvent):
|
||||
# If we're exiting a tool use block and there are still buffered messages,
|
||||
# we should flush them now
|
||||
if self.anthropic_mode == EventMode.TOOL_USE and self.tool_call_buffer:
|
||||
for buffered_msg in self.tool_call_buffer:
|
||||
yield buffered_msg
|
||||
self.tool_call_buffer = []
|
||||
|
||||
self.anthropic_mode = None
|
||||
self.anthropic_mode = None
|
||||
except Exception as e:
|
||||
logger.error("Error processing stream: %s", e)
|
||||
raise
|
||||
finally:
|
||||
logger.info("AnthropicStreamingInterface: Stream processing complete.")
|
||||
|
||||
def get_reasoning_content(self) -> List[Union[TextContent, ReasoningContent, RedactedReasoningContent]]:
|
||||
def _process_group(
|
||||
|
||||
@@ -5,7 +5,7 @@ from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice,
|
||||
|
||||
from letta.constants import PRE_EXECUTION_MESSAGE_ARG
|
||||
from letta.interfaces.utils import _format_sse_chunk
|
||||
from letta.server.rest_api.optimistic_json_parser import OptimisticJSONParser
|
||||
from letta.server.rest_api.json_parser import OptimisticJSONParser
|
||||
|
||||
|
||||
class OpenAIChatCompletionsStreamingInterface:
|
||||
|
||||
Reference in New Issue
Block a user