feat: Separate out streaming route (#2111)

This commit is contained in:
Matthew Zhou
2024-11-27 14:03:46 -08:00
committed by GitHub
parent cfb48a112f
commit 5a59d2ac42
16 changed files with 301 additions and 411 deletions

View File

@@ -271,9 +271,8 @@ class StreamingServerInterface(AgentChunkStreamingInterface):
self,
multi_step=True,
# Related to if we want to try and pass back the AssistantMessage as a special case function
use_assistant_message=False,
assistant_message_function_name=DEFAULT_MESSAGE_TOOL,
assistant_message_function_kwarg=DEFAULT_MESSAGE_TOOL_KWARG,
assistant_message_tool_name=DEFAULT_MESSAGE_TOOL,
assistant_message_tool_kwarg=DEFAULT_MESSAGE_TOOL_KWARG,
# Related to if we expect inner_thoughts to be in the kwargs
inner_thoughts_in_kwargs=True,
inner_thoughts_kwarg=INNER_THOUGHTS_KWARG,
@@ -287,7 +286,7 @@ class StreamingServerInterface(AgentChunkStreamingInterface):
self.streaming_chat_completion_mode_function_name = None # NOTE: sadly need to track state during stream
# If chat completion mode, we need a special stream reader to
# turn function argument to send_message into a normal text stream
self.streaming_chat_completion_json_reader = FunctionArgumentsStreamHandler(json_key=assistant_message_function_kwarg)
self.streaming_chat_completion_json_reader = FunctionArgumentsStreamHandler(json_key=assistant_message_tool_kwarg)
self._chunks = deque()
self._event = asyncio.Event() # Use an event to notify when chunks are available
@@ -300,9 +299,9 @@ class StreamingServerInterface(AgentChunkStreamingInterface):
self.multi_step_gen_indicator = MessageStreamStatus.done_generation
# Support for AssistantMessage
self.use_assistant_message = use_assistant_message
self.assistant_message_function_name = assistant_message_function_name
self.assistant_message_function_kwarg = assistant_message_function_kwarg
self.use_assistant_message = False # TODO: Remove this
self.assistant_message_tool_name = assistant_message_tool_name
self.assistant_message_tool_kwarg = assistant_message_tool_kwarg
# Support for inner_thoughts_in_kwargs
self.inner_thoughts_in_kwargs = inner_thoughts_in_kwargs
@@ -455,17 +454,14 @@ class StreamingServerInterface(AgentChunkStreamingInterface):
# If we get a "hit" on the special keyword we're looking for, we want to skip to the next chunk
# TODO I don't think this handles the function name in multi-pieces problem. Instead, we should probably reset the streaming_chat_completion_mode_function_name when we make this hit?
# if self.streaming_chat_completion_mode_function_name == self.assistant_message_function_name:
if tool_call.function.name == self.assistant_message_function_name:
# if self.streaming_chat_completion_mode_function_name == self.assistant_message_tool_name:
if tool_call.function.name == self.assistant_message_tool_name:
self.streaming_chat_completion_json_reader.reset()
# early exit to turn into content mode
return None
# if we're in the middle of parsing a send_message, we'll keep processing the JSON chunks
if (
tool_call.function.arguments
and self.streaming_chat_completion_mode_function_name == self.assistant_message_function_name
):
if tool_call.function.arguments and self.streaming_chat_completion_mode_function_name == self.assistant_message_tool_name:
# Strip out any extras tokens
cleaned_func_args = self.streaming_chat_completion_json_reader.process_json_chunk(tool_call.function.arguments)
# In the case that we just have the prefix of something, no message yet, then we should early exit to move to the next chunk
@@ -500,9 +496,6 @@ class StreamingServerInterface(AgentChunkStreamingInterface):
)
elif self.inner_thoughts_in_kwargs and tool_call.function:
if self.use_assistant_message:
raise NotImplementedError("inner_thoughts_in_kwargs with use_assistant_message not yet supported")
processed_chunk = None
if tool_call.function.name:
@@ -909,13 +902,13 @@ class StreamingServerInterface(AgentChunkStreamingInterface):
if (
self.use_assistant_message
and function_call.function.name == self.assistant_message_function_name
and self.assistant_message_function_kwarg in func_args
and function_call.function.name == self.assistant_message_tool_name
and self.assistant_message_tool_kwarg in func_args
):
processed_chunk = AssistantMessage(
id=msg_obj.id,
date=msg_obj.created_at,
assistant_message=func_args[self.assistant_message_function_kwarg],
assistant_message=func_args[self.assistant_message_tool_kwarg],
)
else:
processed_chunk = FunctionCallMessage(