diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index 9f17d5ae..1d3b64e4 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -25,8 +25,9 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): specific streaming formats. """ - def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig) -> None: + def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig, run_id: str | None = None) -> None: super().__init__(llm_client, llm_config) + self.run_id = run_id self.interface: OpenAIStreamingInterface | AnthropicStreamingInterface | None = None async def invoke_llm( @@ -57,6 +58,7 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): use_assistant_message=use_assistant_message, put_inner_thoughts_in_kwarg=self.llm_config.put_inner_thoughts_in_kwargs, requires_approval_tools=requires_approval_tools, + run_id=self.run_id, ) elif self.llm_config.model_endpoint_type == ProviderType.openai: # For non-v1 agents, always use Chat Completions streaming interface @@ -67,6 +69,7 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): messages=messages, tools=tools, requires_approval_tools=requires_approval_tools, + run_id=self.run_id, ) else: raise ValueError(f"Streaming not supported for provider {self.llm_config.model_endpoint_type}") diff --git a/letta/adapters/simple_llm_stream_adapter.py b/letta/adapters/simple_llm_stream_adapter.py index 54fb1a51..97f65bc3 100644 --- a/letta/adapters/simple_llm_stream_adapter.py +++ b/letta/adapters/simple_llm_stream_adapter.py @@ -51,6 +51,7 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): # NOTE: different self.interface = SimpleAnthropicStreamingInterface( requires_approval_tools=requires_approval_tools, + run_id=self.run_id, ) elif self.llm_config.model_endpoint_type == ProviderType.openai: # Decide interface based on payload shape @@ -63,6 +64,7 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): messages=messages, tools=tools, requires_approval_tools=requires_approval_tools, + run_id=self.run_id, ) else: self.interface = SimpleOpenAIStreamingInterface( @@ -71,6 +73,7 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): tools=tools, requires_approval_tools=requires_approval_tools, model=self.llm_config.model, + run_id=self.run_id, ) else: raise ValueError(f"Streaming not supported for provider {self.llm_config.model_endpoint_type}") diff --git a/letta/agents/letta_agent_v2.py b/letta/agents/letta_agent_v2.py index 380c9332..82ec1950 100644 --- a/letta/agents/letta_agent_v2.py +++ b/letta/agents/letta_agent_v2.py @@ -273,6 +273,7 @@ class LettaAgentV2(BaseAgentV2): llm_adapter = LettaLLMStreamAdapter( llm_client=self.llm_client, llm_config=self.agent_state.llm_config, + run_id=run_id, ) else: llm_adapter = LettaLLMRequestAdapter( diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index f7e6cef2..73436f7c 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -165,6 +165,7 @@ class LettaAgentV3(LettaAgentV2): llm_adapter = SimpleLLMStreamAdapter( llm_client=self.llm_client, llm_config=self.agent_state.llm_config, + run_id=run_id, ) else: llm_adapter = SimpleLLMRequestAdapter( diff --git a/letta/interfaces/anthropic_streaming_interface.py b/letta/interfaces/anthropic_streaming_interface.py index 7352aea7..86e82723 100644 --- a/letta/interfaces/anthropic_streaming_interface.py +++ b/letta/interfaces/anthropic_streaming_interface.py @@ -66,9 +66,11 @@ class AnthropicStreamingInterface: use_assistant_message: bool = False, put_inner_thoughts_in_kwarg: bool = False, requires_approval_tools: list = [], + run_id: str | None = None, ): self.json_parser: JSONParser = PydanticJSONParser() self.use_assistant_message = use_assistant_message + self.run_id = run_id # Premake IDs for database writes self.letta_message_id = Message.generate_id() @@ -280,6 +282,7 @@ class AnthropicStreamingInterface: tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id), date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.tool_call_buffer.append(tool_call_msg) elif isinstance(content, BetaThinkingBlock): @@ -295,6 +298,7 @@ class AnthropicStreamingInterface: hidden_reasoning=content.data, date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.reasoning_messages.append(hidden_reasoning_message) prev_message_type = hidden_reasoning_message.message_type @@ -340,6 +344,7 @@ class AnthropicStreamingInterface: reasoning=self.accumulated_inner_thoughts[-1], date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.reasoning_messages.append(reasoning_message) prev_message_type = reasoning_message.message_type @@ -367,6 +372,7 @@ class AnthropicStreamingInterface: reasoning=inner_thoughts_diff, date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.reasoning_messages.append(reasoning_message) prev_message_type = reasoning_message.message_type @@ -397,6 +403,7 @@ class AnthropicStreamingInterface: tool_call_id=self.tool_call_id, arguments=tool_call_args, ), + run_id=self.run_id, ) prev_message_type = approval_msg.message_type yield approval_msg @@ -420,6 +427,7 @@ class AnthropicStreamingInterface: tool_call_id=self.tool_call_id, arguments=tool_call_args, ), + run_id=self.run_id, ) prev_message_type = tool_call_msg.message_type yield tool_call_msg @@ -440,6 +448,7 @@ class AnthropicStreamingInterface: content=[TextContent(text=send_message_diff)], date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) prev_message_type = assistant_msg.message_type yield assistant_msg @@ -450,12 +459,14 @@ class AnthropicStreamingInterface: id=self.letta_message_id, tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id, arguments=delta.partial_json), date=datetime.now(timezone.utc).isoformat(), + run_id=self.run_id, ) else: tool_call_msg = ToolCallMessage( id=self.letta_message_id, tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id, arguments=delta.partial_json), date=datetime.now(timezone.utc).isoformat(), + run_id=self.run_id, ) if self.inner_thoughts_complete: if prev_message_type and prev_message_type != "tool_call_message": @@ -483,6 +494,7 @@ class AnthropicStreamingInterface: reasoning=delta.thinking, date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.reasoning_messages.append(reasoning_message) prev_message_type = reasoning_message.message_type @@ -503,6 +515,7 @@ class AnthropicStreamingInterface: date=datetime.now(timezone.utc).isoformat(), signature=delta.signature, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.reasoning_messages.append(reasoning_message) prev_message_type = reasoning_message.message_type @@ -536,8 +549,10 @@ class SimpleAnthropicStreamingInterface: def __init__( self, requires_approval_tools: list = [], + run_id: str | None = None, ): self.json_parser: JSONParser = PydanticJSONParser() + self.run_id = run_id # Premake IDs for database writes self.letta_message_id = Message.generate_id() @@ -748,6 +763,7 @@ class SimpleAnthropicStreamingInterface: tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id), date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) else: if prev_message_type and prev_message_type != "tool_call_message": @@ -757,6 +773,7 @@ class SimpleAnthropicStreamingInterface: tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id), date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) prev_message_type = tool_call_msg.message_type yield tool_call_msg @@ -777,6 +794,7 @@ class SimpleAnthropicStreamingInterface: hidden_reasoning=content.data, date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.reasoning_messages.append(hidden_reasoning_message) @@ -800,6 +818,7 @@ class SimpleAnthropicStreamingInterface: content=delta.text, date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) # self.assistant_messages.append(assistant_msg) self.reasoning_messages.append(assistant_msg) @@ -822,6 +841,7 @@ class SimpleAnthropicStreamingInterface: tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id, arguments=delta.partial_json), date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) else: if prev_message_type and prev_message_type != "tool_call_message": @@ -831,6 +851,7 @@ class SimpleAnthropicStreamingInterface: tool_call=ToolCallDelta(name=self.tool_call_name, tool_call_id=self.tool_call_id, arguments=delta.partial_json), date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) yield tool_call_msg @@ -850,6 +871,7 @@ class SimpleAnthropicStreamingInterface: reasoning=delta.thinking, date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.reasoning_messages.append(reasoning_message) prev_message_type = reasoning_message.message_type @@ -871,6 +893,7 @@ class SimpleAnthropicStreamingInterface: date=datetime.now(timezone.utc).isoformat(), signature=delta.signature, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.reasoning_messages.append(reasoning_message) prev_message_type = reasoning_message.message_type diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 37d166e7..904dcfe3 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -76,11 +76,13 @@ class OpenAIStreamingInterface: tools: Optional[list] = None, put_inner_thoughts_in_kwarg: bool = True, requires_approval_tools: list = [], + run_id: str | None = None, ): self.use_assistant_message = use_assistant_message self.assistant_message_tool_name = DEFAULT_MESSAGE_TOOL self.assistant_message_tool_kwarg = DEFAULT_MESSAGE_TOOL_KWARG self.put_inner_thoughts_in_kwarg = put_inner_thoughts_in_kwarg + self.run_id = run_id self.optimistic_json_parser: OptimisticJSONParser = OptimisticJSONParser() self.function_args_reader = JSONInnerThoughtsExtractor(wait_for_first_key=put_inner_thoughts_in_kwarg) @@ -244,6 +246,7 @@ class OpenAIStreamingInterface: state="omitted", hidden_reasoning=None, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) yield hidden_message prev_message_type = hidden_message.message_type @@ -283,6 +286,7 @@ class OpenAIStreamingInterface: reasoning=updates_inner_thoughts, # name=name, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) prev_message_type = reasoning_message.message_type yield reasoning_message @@ -324,6 +328,7 @@ class OpenAIStreamingInterface: tool_call_id=self.function_id_buffer, ), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) else: tool_call_msg = ToolCallMessage( @@ -335,6 +340,7 @@ class OpenAIStreamingInterface: tool_call_id=self.function_id_buffer, ), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) prev_message_type = tool_call_msg.message_type yield tool_call_msg @@ -380,6 +386,7 @@ class OpenAIStreamingInterface: date=datetime.now(timezone.utc), content=extracted, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) prev_message_type = assistant_message.message_type yield assistant_message @@ -405,6 +412,7 @@ class OpenAIStreamingInterface: ), # name=name, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) else: tool_call_msg = ToolCallMessage( @@ -417,6 +425,7 @@ class OpenAIStreamingInterface: ), # name=name, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) prev_message_type = tool_call_msg.message_type yield tool_call_msg @@ -438,6 +447,7 @@ class OpenAIStreamingInterface: ), # name=name, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) else: tool_call_msg = ToolCallMessage( @@ -450,6 +460,7 @@ class OpenAIStreamingInterface: ), # name=name, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) prev_message_type = tool_call_msg.message_type yield tool_call_msg @@ -470,7 +481,9 @@ class SimpleOpenAIStreamingInterface: tools: Optional[list] = None, requires_approval_tools: list = [], model: str = None, + run_id: str | None = None, ): + self.run_id = run_id # Premake IDs for database writes self.letta_message_id = Message.generate_id() @@ -565,6 +578,7 @@ class SimpleOpenAIStreamingInterface: state="omitted", hidden_reasoning=None, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.content_messages.append(hidden_message) prev_message_type = hidden_message.message_type @@ -635,6 +649,7 @@ class SimpleOpenAIStreamingInterface: content=[TextContent(text=message_delta.content)], date=datetime.now(timezone.utc).isoformat(), otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) self.content_messages.append(assistant_msg) prev_message_type = assistant_msg.message_type @@ -683,6 +698,7 @@ class SimpleOpenAIStreamingInterface: ), # name=name, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) else: tool_call_msg = ToolCallMessage( @@ -695,6 +711,7 @@ class SimpleOpenAIStreamingInterface: ), # name=name, otid=Message.generate_otid_from_id(self.letta_message_id, message_index), + run_id=self.run_id, ) prev_message_type = tool_call_msg.message_type message_index += 1 # Increment for the next message @@ -713,6 +730,7 @@ class SimpleOpenAIResponsesStreamingInterface: tools: Optional[list] = None, requires_approval_tools: list = [], model: str = None, + run_id: str | None = None, ): self.is_openai_proxy = is_openai_proxy self.messages = messages @@ -722,6 +740,7 @@ class SimpleOpenAIResponsesStreamingInterface: self.tool_call_name = None # ID responses used self.message_id = None + self.run_id = run_id # Premake IDs for database writes self.letta_message_id = Message.generate_id() @@ -872,6 +891,7 @@ class SimpleOpenAIResponsesStreamingInterface: otid=Message.generate_otid_from_id(self.letta_message_id, message_index), source="reasoner_model", reasoning=concat_summary, + run_id=self.run_id, ) else: return @@ -893,6 +913,7 @@ class SimpleOpenAIResponsesStreamingInterface: arguments=arguments if arguments != "" else None, tool_call_id=call_id, ), + run_id=self.run_id, ) else: yield ToolCallMessage( @@ -904,6 +925,7 @@ class SimpleOpenAIResponsesStreamingInterface: arguments=arguments if arguments != "" else None, tool_call_id=call_id, ), + run_id=self.run_id, ) elif isinstance(new_event_item, ResponseOutputMessage): @@ -917,6 +939,7 @@ class SimpleOpenAIResponsesStreamingInterface: otid=Message.generate_otid_from_id(self.letta_message_id, message_index), date=datetime.now(timezone.utc), content=content_item.text, + run_id=self.run_id, ) else: return @@ -944,6 +967,7 @@ class SimpleOpenAIResponsesStreamingInterface: otid=Message.generate_otid_from_id(self.letta_message_id, message_index), source="reasoner_model", reasoning=summary_text, + run_id=self.run_id, ) # Reasoning summary streaming @@ -962,6 +986,7 @@ class SimpleOpenAIResponsesStreamingInterface: otid=Message.generate_otid_from_id(self.letta_message_id, message_index), source="reasoner_model", reasoning=delta, + run_id=self.run_id, ) else: return @@ -1001,6 +1026,7 @@ class SimpleOpenAIResponsesStreamingInterface: otid=Message.generate_otid_from_id(self.letta_message_id, message_index), date=datetime.now(timezone.utc), content=delta, + run_id=self.run_id, ) else: return @@ -1032,6 +1058,7 @@ class SimpleOpenAIResponsesStreamingInterface: arguments=delta, tool_call_id=None, ), + run_id=self.run_id, ) else: yield ToolCallMessage( @@ -1043,6 +1070,7 @@ class SimpleOpenAIResponsesStreamingInterface: arguments=delta, tool_call_id=None, ), + run_id=self.run_id, ) # Function calls