feat: add run id to streamed messages (#5037)

This commit is contained in:
cthomas
2025-09-30 16:54:00 -07:00
committed by Caren Thomas
parent 6255d59bba
commit 67f8e46619
6 changed files with 60 additions and 1 deletions

View File

@@ -66,9 +66,11 @@ class AnthropicStreamingInterface:
use_assistant_message: bool = False,
put_inner_thoughts_in_kwarg: bool = False,
requires_approval_tools: list = [],
run_id: str | None = None,
):
self.json_parser: JSONParser = PydanticJSONParser()
self.use_assistant_message = use_assistant_message
self.run_id = run_id
# Premake IDs for database writes
self.letta_message_id = Message.generate_id()
@@ -280,6 +282,7 @@ class AnthropicStreamingInterface:
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id),
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.tool_call_buffer.append(tool_call_msg)
elif isinstance(content, BetaThinkingBlock):
@@ -295,6 +298,7 @@ class AnthropicStreamingInterface:
hidden_reasoning=content.data,
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.reasoning_messages.append(hidden_reasoning_message)
prev_message_type = hidden_reasoning_message.message_type
@@ -340,6 +344,7 @@ class AnthropicStreamingInterface:
reasoning=self.accumulated_inner_thoughts[-1],
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.reasoning_messages.append(reasoning_message)
prev_message_type = reasoning_message.message_type
@@ -367,6 +372,7 @@ class AnthropicStreamingInterface:
reasoning=inner_thoughts_diff,
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.reasoning_messages.append(reasoning_message)
prev_message_type = reasoning_message.message_type
@@ -397,6 +403,7 @@ class AnthropicStreamingInterface:
tool_call_id=self.tool_call_id,
arguments=tool_call_args,
),
run_id=self.run_id,
)
prev_message_type = approval_msg.message_type
yield approval_msg
@@ -420,6 +427,7 @@ class AnthropicStreamingInterface:
tool_call_id=self.tool_call_id,
arguments=tool_call_args,
),
run_id=self.run_id,
)
prev_message_type = tool_call_msg.message_type
yield tool_call_msg
@@ -440,6 +448,7 @@ class AnthropicStreamingInterface:
content=[TextContent(text=send_message_diff)],
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
prev_message_type = assistant_msg.message_type
yield assistant_msg
@@ -450,12 +459,14 @@ class AnthropicStreamingInterface:
id=self.letta_message_id,
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id, arguments=delta.partial_json),
date=datetime.now(timezone.utc).isoformat(),
run_id=self.run_id,
)
else:
tool_call_msg = ToolCallMessage(
id=self.letta_message_id,
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id, arguments=delta.partial_json),
date=datetime.now(timezone.utc).isoformat(),
run_id=self.run_id,
)
if self.inner_thoughts_complete:
if prev_message_type and prev_message_type != "tool_call_message":
@@ -483,6 +494,7 @@ class AnthropicStreamingInterface:
reasoning=delta.thinking,
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.reasoning_messages.append(reasoning_message)
prev_message_type = reasoning_message.message_type
@@ -503,6 +515,7 @@ class AnthropicStreamingInterface:
date=datetime.now(timezone.utc).isoformat(),
signature=delta.signature,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.reasoning_messages.append(reasoning_message)
prev_message_type = reasoning_message.message_type
@@ -536,8 +549,10 @@ class SimpleAnthropicStreamingInterface:
def __init__(
self,
requires_approval_tools: list = [],
run_id: str | None = None,
):
self.json_parser: JSONParser = PydanticJSONParser()
self.run_id = run_id
# Premake IDs for database writes
self.letta_message_id = Message.generate_id()
@@ -748,6 +763,7 @@ class SimpleAnthropicStreamingInterface:
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id),
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
else:
if prev_message_type and prev_message_type != "tool_call_message":
@@ -757,6 +773,7 @@ class SimpleAnthropicStreamingInterface:
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id),
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
prev_message_type = tool_call_msg.message_type
yield tool_call_msg
@@ -777,6 +794,7 @@ class SimpleAnthropicStreamingInterface:
hidden_reasoning=content.data,
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.reasoning_messages.append(hidden_reasoning_message)
@@ -800,6 +818,7 @@ class SimpleAnthropicStreamingInterface:
content=delta.text,
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
# self.assistant_messages.append(assistant_msg)
self.reasoning_messages.append(assistant_msg)
@@ -822,6 +841,7 @@ class SimpleAnthropicStreamingInterface:
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id, arguments=delta.partial_json),
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
else:
if prev_message_type and prev_message_type != "tool_call_message":
@@ -831,6 +851,7 @@ class SimpleAnthropicStreamingInterface:
tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id, arguments=delta.partial_json),
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
yield tool_call_msg
@@ -850,6 +871,7 @@ class SimpleAnthropicStreamingInterface:
reasoning=delta.thinking,
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.reasoning_messages.append(reasoning_message)
prev_message_type = reasoning_message.message_type
@@ -871,6 +893,7 @@ class SimpleAnthropicStreamingInterface:
date=datetime.now(timezone.utc).isoformat(),
signature=delta.signature,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.reasoning_messages.append(reasoning_message)
prev_message_type = reasoning_message.message_type

View File

@@ -76,11 +76,13 @@ class OpenAIStreamingInterface:
tools: Optional[list] = None,
put_inner_thoughts_in_kwarg: bool = True,
requires_approval_tools: list = [],
run_id: str | None = None,
):
self.use_assistant_message = use_assistant_message
self.assistant_message_tool_name = DEFAULT_MESSAGE_TOOL
self.assistant_message_tool_kwarg = DEFAULT_MESSAGE_TOOL_KWARG
self.put_inner_thoughts_in_kwarg = put_inner_thoughts_in_kwarg
self.run_id = run_id
self.optimistic_json_parser: OptimisticJSONParser = OptimisticJSONParser()
self.function_args_reader = JSONInnerThoughtsExtractor(wait_for_first_key=put_inner_thoughts_in_kwarg)
@@ -244,6 +246,7 @@ class OpenAIStreamingInterface:
state="omitted",
hidden_reasoning=None,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
yield hidden_message
prev_message_type = hidden_message.message_type
@@ -283,6 +286,7 @@ class OpenAIStreamingInterface:
reasoning=updates_inner_thoughts,
# name=name,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
prev_message_type = reasoning_message.message_type
yield reasoning_message
@@ -324,6 +328,7 @@ class OpenAIStreamingInterface:
tool_call_id=self.function_id_buffer,
),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
else:
tool_call_msg = ToolCallMessage(
@@ -335,6 +340,7 @@ class OpenAIStreamingInterface:
tool_call_id=self.function_id_buffer,
),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
prev_message_type = tool_call_msg.message_type
yield tool_call_msg
@@ -380,6 +386,7 @@ class OpenAIStreamingInterface:
date=datetime.now(timezone.utc),
content=extracted,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
prev_message_type = assistant_message.message_type
yield assistant_message
@@ -405,6 +412,7 @@ class OpenAIStreamingInterface:
),
# name=name,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
else:
tool_call_msg = ToolCallMessage(
@@ -417,6 +425,7 @@ class OpenAIStreamingInterface:
),
# name=name,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
prev_message_type = tool_call_msg.message_type
yield tool_call_msg
@@ -438,6 +447,7 @@ class OpenAIStreamingInterface:
),
# name=name,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
else:
tool_call_msg = ToolCallMessage(
@@ -450,6 +460,7 @@ class OpenAIStreamingInterface:
),
# name=name,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
prev_message_type = tool_call_msg.message_type
yield tool_call_msg
@@ -470,7 +481,9 @@ class SimpleOpenAIStreamingInterface:
tools: Optional[list] = None,
requires_approval_tools: list = [],
model: str = None,
run_id: str | None = None,
):
self.run_id = run_id
# Premake IDs for database writes
self.letta_message_id = Message.generate_id()
@@ -565,6 +578,7 @@ class SimpleOpenAIStreamingInterface:
state="omitted",
hidden_reasoning=None,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.content_messages.append(hidden_message)
prev_message_type = hidden_message.message_type
@@ -635,6 +649,7 @@ class SimpleOpenAIStreamingInterface:
content=[TextContent(text=message_delta.content)],
date=datetime.now(timezone.utc).isoformat(),
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
self.content_messages.append(assistant_msg)
prev_message_type = assistant_msg.message_type
@@ -683,6 +698,7 @@ class SimpleOpenAIStreamingInterface:
),
# name=name,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
else:
tool_call_msg = ToolCallMessage(
@@ -695,6 +711,7 @@ class SimpleOpenAIStreamingInterface:
),
# name=name,
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
run_id=self.run_id,
)
prev_message_type = tool_call_msg.message_type
message_index += 1 # Increment for the next message
@@ -713,6 +730,7 @@ class SimpleOpenAIResponsesStreamingInterface:
tools: Optional[list] = None,
requires_approval_tools: list = [],
model: str = None,
run_id: str | None = None,
):
self.is_openai_proxy = is_openai_proxy
self.messages = messages
@@ -722,6 +740,7 @@ class SimpleOpenAIResponsesStreamingInterface:
self.tool_call_name = None
# ID responses used
self.message_id = None
self.run_id = run_id
# Premake IDs for database writes
self.letta_message_id = Message.generate_id()
@@ -872,6 +891,7 @@ class SimpleOpenAIResponsesStreamingInterface:
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
source="reasoner_model",
reasoning=concat_summary,
run_id=self.run_id,
)
else:
return
@@ -893,6 +913,7 @@ class SimpleOpenAIResponsesStreamingInterface:
arguments=arguments if arguments != "" else None,
tool_call_id=call_id,
),
run_id=self.run_id,
)
else:
yield ToolCallMessage(
@@ -904,6 +925,7 @@ class SimpleOpenAIResponsesStreamingInterface:
arguments=arguments if arguments != "" else None,
tool_call_id=call_id,
),
run_id=self.run_id,
)
elif isinstance(new_event_item, ResponseOutputMessage):
@@ -917,6 +939,7 @@ class SimpleOpenAIResponsesStreamingInterface:
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
date=datetime.now(timezone.utc),
content=content_item.text,
run_id=self.run_id,
)
else:
return
@@ -944,6 +967,7 @@ class SimpleOpenAIResponsesStreamingInterface:
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
source="reasoner_model",
reasoning=summary_text,
run_id=self.run_id,
)
# Reasoning summary streaming
@@ -962,6 +986,7 @@ class SimpleOpenAIResponsesStreamingInterface:
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
source="reasoner_model",
reasoning=delta,
run_id=self.run_id,
)
else:
return
@@ -1001,6 +1026,7 @@ class SimpleOpenAIResponsesStreamingInterface:
otid=Message.generate_otid_from_id(self.letta_message_id, message_index),
date=datetime.now(timezone.utc),
content=delta,
run_id=self.run_id,
)
else:
return
@@ -1032,6 +1058,7 @@ class SimpleOpenAIResponsesStreamingInterface:
arguments=delta,
tool_call_id=None,
),
run_id=self.run_id,
)
else:
yield ToolCallMessage(
@@ -1043,6 +1070,7 @@ class SimpleOpenAIResponsesStreamingInterface:
arguments=delta,
tool_call_id=None,
),
run_id=self.run_id,
)
# Function calls