From 94899b2be13163ca20cbb284e6d12aa42604b048 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Thu, 20 Feb 2025 11:37:52 -0800 Subject: [PATCH] fix: Fix chat completions streaming (#1078) --- .../rest_api/chat_completions_interface.py | 4 ++- letta/streaming_interface.py | 8 ++++-- tests/integration_test_chat_completions.py | 28 +++++++++++++------ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/letta/server/rest_api/chat_completions_interface.py b/letta/server/rest_api/chat_completions_interface.py index 88fde203..9c03464e 100644 --- a/letta/server/rest_api/chat_completions_interface.py +++ b/letta/server/rest_api/chat_completions_interface.py @@ -153,7 +153,9 @@ class ChatCompletionsStreamingInterface(AgentChunkStreamingInterface): """No-op retained for interface compatibility.""" return - def process_chunk(self, chunk: ChatCompletionChunkResponse, message_id: str, message_date: datetime) -> None: + def process_chunk( + self, chunk: ChatCompletionChunkResponse, message_id: str, message_date: datetime, expect_reasoning_content: bool = False + ) -> None: """ Called externally with a ChatCompletionChunkResponse. Transforms it if necessary, then enqueues partial messages for streaming back. diff --git a/letta/streaming_interface.py b/letta/streaming_interface.py index 2949b94e..3d007cad 100644 --- a/letta/streaming_interface.py +++ b/letta/streaming_interface.py @@ -48,7 +48,9 @@ class AgentChunkStreamingInterface(ABC): raise NotImplementedError @abstractmethod - def process_chunk(self, chunk: ChatCompletionChunkResponse, message_id: str, message_date: datetime): + def process_chunk( + self, chunk: ChatCompletionChunkResponse, message_id: str, message_date: datetime, expect_reasoning_content: bool = False + ): """Process a streaming chunk from an OpenAI-compatible server""" raise NotImplementedError @@ -92,7 +94,9 @@ class StreamingCLIInterface(AgentChunkStreamingInterface): def _flush(self): pass - def process_chunk(self, chunk: ChatCompletionChunkResponse, message_id: str, message_date: datetime): + def process_chunk( + self, chunk: ChatCompletionChunkResponse, message_id: str, message_date: datetime, expect_reasoning_content: bool = False + ): assert len(chunk.choices) == 1, chunk message_delta = chunk.choices[0].delta diff --git a/tests/integration_test_chat_completions.py b/tests/integration_test_chat_completions.py index ef19bdb6..767456ca 100644 --- a/tests/integration_test_chat_completions.py +++ b/tests/integration_test_chat_completions.py @@ -120,9 +120,16 @@ def test_chat_completions_streaming(mock_e2b_api_key_none, client, agent, messag f"{client.base_url}/openai/{client.api_prefix}/chat/completions", request.model_dump(exclude_none=True), client.headers ) - chunks = list(response) - for idx, chunk in enumerate(chunks): - _assert_valid_chunk(chunk, idx, chunks) + try: + chunks = list(response) + assert len(chunks) > 1, "Streaming response did not return enough chunks (may have failed silently)." + + for idx, chunk in enumerate(chunks): + assert chunk, f"Empty chunk received at index {idx}." + print(chunk) + _assert_valid_chunk(chunk, idx, chunks) + except Exception as e: + pytest.fail(f"Streaming failed with exception: {e}") @pytest.mark.asyncio @@ -134,10 +141,15 @@ async def test_chat_completions_streaming_async(client, agent, message): async_client = AsyncOpenAI(base_url=f"{client.base_url}/openai/{client.api_prefix}", max_retries=0) stream = await async_client.chat.completions.create(**request.model_dump(exclude_none=True)) - async with stream: - async for chunk in stream: - if isinstance(chunk, ChatCompletionChunk): + received_chunks = 0 + try: + async with stream: + async for chunk in stream: + assert isinstance(chunk, ChatCompletionChunk), f"Unexpected chunk type: {type(chunk)}" assert chunk.choices, "Each ChatCompletionChunk should have at least one choice." assert chunk.choices[0].delta.content, f"Chunk at index 0 has no content: {chunk.model_dump_json(indent=4)}" - else: - pytest.fail(f"Unexpected chunk type: {chunk}") + received_chunks += 1 + except Exception as e: + pytest.fail(f"Streaming failed with exception: {e}") + + assert received_chunks > 1, "No valid streaming chunks were received."