diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 35a3fee1..cb751a0e 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -1811,7 +1811,7 @@ async def _process_message_background( await runs_manager.update_run_by_id_async( run_id=run_id, - update=RunUpdate(status=RunStatus.failed), + update=RunUpdate(status=RunStatus.failed, metadata={"error": str(e)}), actor=actor, ) except Exception as e: @@ -1821,7 +1821,7 @@ async def _process_message_background( await runs_manager.update_run_by_id_async( run_id=run_id, - update=RunUpdate(status=RunStatus.failed), + update=RunUpdate(status=RunStatus.failed, metadata={"error": str(e)}), actor=actor, ) finally: @@ -1953,13 +1953,15 @@ async def send_message_async( logger.error(f"Unhandled exception in background task for run {run.id}: {e}") from letta.services.run_manager import RunManager + error_str = str(e) + async def update_failed_run(): runs_manager = RunManager() from letta.schemas.enums import RunStatus await runs_manager.update_run_by_id_async( run_id=run.id, - update=RunUpdate(status=RunStatus.failed), + update=RunUpdate(status=RunStatus.failed, metadata={"error": error_str}), actor=actor, ) diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 12aca98b..ec8bf2d6 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -338,6 +338,7 @@ class StreamingService: message="Stream ended unexpectedly without a terminal event.", detail=None, ) + error_data = {"error": error_message.model_dump()} yield f"data: {stop_reason.model_dump_json()}\n\n" yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" yield "data: [DONE]\n\n" @@ -362,6 +363,7 @@ class StreamingService: message="The LLM request timed out. Please try again.", detail=str(e), ) + error_data = {"error": error_message.model_dump()} logger.error(f"Run {run_id} stopped with LLM timeout error: {e}, error_data: {error_message.model_dump()}") yield f"data: {stop_reason.model_dump_json()}\n\n" yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" @@ -376,6 +378,7 @@ class StreamingService: message="Rate limit exceeded for LLM model provider. Please wait before making another request.", detail=str(e), ) + error_data = {"error": error_message.model_dump()} logger.warning(f"Run {run_id} stopped with LLM rate limit error: {e}, error_data: {error_message.model_dump()}") yield f"data: {stop_reason.model_dump_json()}\n\n" yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" @@ -383,7 +386,6 @@ class StreamingService: yield "data: [DONE]\n\n" except LLMAuthenticationError as e: run_status = RunStatus.failed - logger.warning(f"Run {run_id} stopped with LLM authentication error: {e}, error_data: {error_data}") stop_reason = LettaStopReason(stop_reason=StopReasonType.llm_api_error) error_message = LettaErrorMessage( run_id=run_id, @@ -391,13 +393,14 @@ class StreamingService: message="Authentication failed with the LLM model provider.", detail=str(e), ) + error_data = {"error": error_message.model_dump()} + logger.warning(f"Run {run_id} stopped with LLM authentication error: {e}, error_data: {error_message.model_dump()}") yield f"data: {stop_reason.model_dump_json()}\n\n" yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except LLMError as e: run_status = RunStatus.failed - logger.error(f"Run {run_id} stopped with LLM error: {e}, error_data: {error_data}") stop_reason = LettaStopReason(stop_reason=StopReasonType.llm_api_error) error_message = LettaErrorMessage( run_id=run_id, @@ -405,13 +408,14 @@ class StreamingService: message="An error occurred with the LLM request.", detail=str(e), ) + error_data = {"error": error_message.model_dump()} + logger.error(f"Run {run_id} stopped with LLM error: {e}, error_data: {error_message.model_dump()}") yield f"data: {stop_reason.model_dump_json()}\n\n" yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except Exception as e: run_status = RunStatus.failed - logger.error(f"Run {run_id} stopped with unknown error: {e}, error_data: {error_data}") stop_reason = LettaStopReason(stop_reason=StopReasonType.error) error_message = LettaErrorMessage( run_id=run_id, @@ -419,6 +423,8 @@ class StreamingService: message="An unknown error occurred with the LLM streaming request.", detail=str(e), ) + error_data = {"error": error_message.model_dump()} + logger.error(f"Run {run_id} stopped with unknown error: {e}, error_data: {error_message.model_dump()}") yield f"data: {stop_reason.model_dump_json()}\n\n" yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" # Send [DONE] marker to properly close the stream diff --git a/tests/managers/test_cancellation.py b/tests/managers/test_cancellation.py index 554b700f..804fd4ee 100644 --- a/tests/managers/test_cancellation.py +++ b/tests/managers/test_cancellation.py @@ -1453,3 +1453,304 @@ class TestEdgeCases: # Should still detect as cancelled assert result_2.stop_reason.stop_reason == "cancelled" + + +class TestErrorDataPersistence: + """ + Test that error data is properly stored in run metadata when runs fail. + This ensures errors can be debugged by inspecting the run's metadata field. + """ + + @pytest.mark.asyncio + async def test_error_data_stored_in_run_metadata_on_background_streaming_llm_error( + self, + server: SyncServer, + default_user, + test_agent_with_tool, + ): + """ + Test that when a background streaming run fails due to an LLM error, + the error details are stored in the run's metadata field. + + This test validates the fix for the issue where failed runs showed + empty metadata in the database, making it impossible to debug errors. + + The test patches LettaAgentV3.stream to raise an LLMError, simulating + what happens when the LLM provider returns an error during streaming. + + Verifies: + - Run status is set to 'failed' + - Run metadata contains 'error' key with error details + """ + from unittest.mock import patch + + from letta.agents.letta_agent_v3 import LettaAgentV3 + from letta.errors import LLMError + from letta.services.streaming_service import StreamingService + + # Create streaming service + streaming_service = StreamingService(server) + + # Create request with background streaming - NOT background for simplicity + # Background streaming adds Redis complexity, so we test foreground streaming + # which still exercises the same error handling in _create_error_aware_stream + request = LettaStreamingRequest( + messages=[MessageCreate(role=MessageRole.user, content="Hello, please respond")], + max_steps=1, + stream_tokens=True, + background=False, + ) + + # Mock stream method that raises an error + async def mock_stream_raises_llm_error(*args, **kwargs): + raise LLMError("Simulated LLM error for testing") + yield # Make it a generator + + # Use patch to simulate the error during streaming + with patch.object(LettaAgentV3, "stream", mock_stream_raises_llm_error): + # Start the streaming request + run, stream_response = await streaming_service.create_agent_stream( + agent_id=test_agent_with_tool.id, + actor=default_user, + request=request, + run_type="test_error_persistence", + ) + + assert run is not None, "Run should be created" + + # Consume the stream to trigger error handling + collected_chunks = [] + async for chunk in stream_response.body_iterator: + collected_chunks.append(chunk) + + # Give any async handling time to complete + await asyncio.sleep(0.2) + + # Fetch the run from the database + fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user) + + # Verify the run status is failed + assert fetched_run.status == RunStatus.failed, f"Expected status 'failed', got '{fetched_run.status}'" + + # Verify metadata contains error information + assert fetched_run.metadata is not None, ( + f"Run metadata should not be None after error. " + f"Run ID: {run.id}, Status: {fetched_run.status}, Stop reason: {fetched_run.stop_reason}" + ) + assert "error" in fetched_run.metadata, f"Run metadata should contain 'error' key, got: {fetched_run.metadata}" + + error_info = fetched_run.metadata["error"] + # The error is stored as a dict from LettaErrorMessage.model_dump() + assert isinstance(error_info, dict), f"Error info should be a dict, got: {type(error_info)}" + assert "error_type" in error_info, f"Error info should contain 'error_type', got: {error_info}" + assert error_info["error_type"] == "llm_error", f"Expected error_type 'llm_error', got: {error_info['error_type']}" + + @pytest.mark.asyncio + async def test_error_data_stored_on_streaming_timeout_error( + self, + server: SyncServer, + default_user, + test_agent_with_tool, + ): + """ + Test that timeout errors during streaming store error data. + + Verifies: + - Timeout errors are properly captured in run metadata + - Run can be queried from DB and error details are available + """ + from unittest.mock import patch + + from letta.agents.letta_agent_v3 import LettaAgentV3 + from letta.errors import LLMTimeoutError + from letta.services.streaming_service import StreamingService + + streaming_service = StreamingService(server) + + request = LettaStreamingRequest( + messages=[MessageCreate(role=MessageRole.user, content="Hello")], + max_steps=1, + stream_tokens=True, + background=False, + ) + + async def mock_stream_raises_timeout(*args, **kwargs): + raise LLMTimeoutError("Request timed out after 30 seconds") + yield + + with patch.object(LettaAgentV3, "stream", mock_stream_raises_timeout): + run, stream_response = await streaming_service.create_agent_stream( + agent_id=test_agent_with_tool.id, + actor=default_user, + request=request, + run_type="test_timeout_error", + ) + + # Consume the stream + async for _ in stream_response.body_iterator: + pass + + await asyncio.sleep(0.2) + + fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user) + + assert fetched_run.status == RunStatus.failed + assert fetched_run.metadata is not None, f"Run metadata should contain error info for run {run.id}" + assert "error" in fetched_run.metadata + assert fetched_run.metadata["error"]["error_type"] == "llm_timeout" + + @pytest.mark.asyncio + async def test_error_data_stored_on_streaming_rate_limit_error( + self, + server: SyncServer, + default_user, + test_agent_with_tool, + ): + """ + Test that rate limit errors during streaming store error data. + + Verifies: + - Rate limit errors are properly captured in run metadata + """ + from unittest.mock import patch + + from letta.agents.letta_agent_v3 import LettaAgentV3 + from letta.errors import LLMRateLimitError + from letta.services.streaming_service import StreamingService + + streaming_service = StreamingService(server) + + request = LettaStreamingRequest( + messages=[MessageCreate(role=MessageRole.user, content="Hello")], + max_steps=1, + stream_tokens=True, + background=False, + ) + + async def mock_stream_raises_rate_limit(*args, **kwargs): + raise LLMRateLimitError("Rate limit exceeded: 100 requests per minute") + yield + + with patch.object(LettaAgentV3, "stream", mock_stream_raises_rate_limit): + run, stream_response = await streaming_service.create_agent_stream( + agent_id=test_agent_with_tool.id, + actor=default_user, + request=request, + run_type="test_rate_limit_error", + ) + + async for _ in stream_response.body_iterator: + pass + + await asyncio.sleep(0.2) + + fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user) + + assert fetched_run.status == RunStatus.failed + assert fetched_run.metadata is not None + assert "error" in fetched_run.metadata + assert fetched_run.metadata["error"]["error_type"] == "llm_rate_limit" + + @pytest.mark.asyncio + async def test_error_data_stored_on_streaming_auth_error( + self, + server: SyncServer, + default_user, + test_agent_with_tool, + ): + """ + Test that authentication errors during streaming store error data. + + Verifies: + - Auth errors are properly captured in run metadata + """ + from unittest.mock import patch + + from letta.agents.letta_agent_v3 import LettaAgentV3 + from letta.errors import LLMAuthenticationError + from letta.services.streaming_service import StreamingService + + streaming_service = StreamingService(server) + + request = LettaStreamingRequest( + messages=[MessageCreate(role=MessageRole.user, content="Hello")], + max_steps=1, + stream_tokens=True, + background=False, + ) + + async def mock_stream_raises_auth_error(*args, **kwargs): + raise LLMAuthenticationError("Invalid API key") + yield + + with patch.object(LettaAgentV3, "stream", mock_stream_raises_auth_error): + run, stream_response = await streaming_service.create_agent_stream( + agent_id=test_agent_with_tool.id, + actor=default_user, + request=request, + run_type="test_auth_error", + ) + + async for _ in stream_response.body_iterator: + pass + + await asyncio.sleep(0.2) + + fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user) + + assert fetched_run.status == RunStatus.failed + assert fetched_run.metadata is not None + assert "error" in fetched_run.metadata + assert fetched_run.metadata["error"]["error_type"] == "llm_authentication" + + @pytest.mark.asyncio + async def test_error_data_stored_on_generic_exception( + self, + server: SyncServer, + default_user, + test_agent_with_tool, + ): + """ + Test that generic exceptions during streaming store error data. + + Verifies: + - Generic exceptions result in error data being stored + - Error details are preserved in metadata with 'internal_error' type + """ + from unittest.mock import patch + + from letta.agents.letta_agent_v3 import LettaAgentV3 + from letta.services.streaming_service import StreamingService + + streaming_service = StreamingService(server) + + request = LettaStreamingRequest( + messages=[MessageCreate(role=MessageRole.user, content="Hello")], + max_steps=1, + stream_tokens=True, + background=False, + ) + + async def mock_stream_raises_generic_error(*args, **kwargs): + raise RuntimeError("Unexpected internal error") + yield + + with patch.object(LettaAgentV3, "stream", mock_stream_raises_generic_error): + run, stream_response = await streaming_service.create_agent_stream( + agent_id=test_agent_with_tool.id, + actor=default_user, + request=request, + run_type="test_generic_error", + ) + + async for _ in stream_response.body_iterator: + pass + + await asyncio.sleep(0.2) + + fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user) + + assert fetched_run.status == RunStatus.failed + assert fetched_run.metadata is not None, "Run metadata should contain error info for generic exception" + assert "error" in fetched_run.metadata + assert fetched_run.metadata["error"]["error_type"] == "internal_error"