feat: store run errors on streaming (#6573)

This commit is contained in:
Sarah Wooders
2025-12-09 14:34:01 -08:00
committed by Caren Thomas
parent 848a73125c
commit fca5774795
3 changed files with 315 additions and 6 deletions

View File

@@ -1811,7 +1811,7 @@ async def _process_message_background(
await runs_manager.update_run_by_id_async(
run_id=run_id,
update=RunUpdate(status=RunStatus.failed),
update=RunUpdate(status=RunStatus.failed, metadata={"error": str(e)}),
actor=actor,
)
except Exception as e:
@@ -1821,7 +1821,7 @@ async def _process_message_background(
await runs_manager.update_run_by_id_async(
run_id=run_id,
update=RunUpdate(status=RunStatus.failed),
update=RunUpdate(status=RunStatus.failed, metadata={"error": str(e)}),
actor=actor,
)
finally:
@@ -1953,13 +1953,15 @@ async def send_message_async(
logger.error(f"Unhandled exception in background task for run {run.id}: {e}")
from letta.services.run_manager import RunManager
error_str = str(e)
async def update_failed_run():
runs_manager = RunManager()
from letta.schemas.enums import RunStatus
await runs_manager.update_run_by_id_async(
run_id=run.id,
update=RunUpdate(status=RunStatus.failed),
update=RunUpdate(status=RunStatus.failed, metadata={"error": error_str}),
actor=actor,
)

View File

@@ -338,6 +338,7 @@ class StreamingService:
message="Stream ended unexpectedly without a terminal event.",
detail=None,
)
error_data = {"error": error_message.model_dump()}
yield f"data: {stop_reason.model_dump_json()}\n\n"
yield f"event: error\ndata: {error_message.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
@@ -362,6 +363,7 @@ class StreamingService:
message="The LLM request timed out. Please try again.",
detail=str(e),
)
error_data = {"error": error_message.model_dump()}
logger.error(f"Run {run_id} stopped with LLM timeout error: {e}, error_data: {error_message.model_dump()}")
yield f"data: {stop_reason.model_dump_json()}\n\n"
yield f"event: error\ndata: {error_message.model_dump_json()}\n\n"
@@ -376,6 +378,7 @@ class StreamingService:
message="Rate limit exceeded for LLM model provider. Please wait before making another request.",
detail=str(e),
)
error_data = {"error": error_message.model_dump()}
logger.warning(f"Run {run_id} stopped with LLM rate limit error: {e}, error_data: {error_message.model_dump()}")
yield f"data: {stop_reason.model_dump_json()}\n\n"
yield f"event: error\ndata: {error_message.model_dump_json()}\n\n"
@@ -383,7 +386,6 @@ class StreamingService:
yield "data: [DONE]\n\n"
except LLMAuthenticationError as e:
run_status = RunStatus.failed
logger.warning(f"Run {run_id} stopped with LLM authentication error: {e}, error_data: {error_data}")
stop_reason = LettaStopReason(stop_reason=StopReasonType.llm_api_error)
error_message = LettaErrorMessage(
run_id=run_id,
@@ -391,13 +393,14 @@ class StreamingService:
message="Authentication failed with the LLM model provider.",
detail=str(e),
)
error_data = {"error": error_message.model_dump()}
logger.warning(f"Run {run_id} stopped with LLM authentication error: {e}, error_data: {error_message.model_dump()}")
yield f"data: {stop_reason.model_dump_json()}\n\n"
yield f"event: error\ndata: {error_message.model_dump_json()}\n\n"
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except LLMError as e:
run_status = RunStatus.failed
logger.error(f"Run {run_id} stopped with LLM error: {e}, error_data: {error_data}")
stop_reason = LettaStopReason(stop_reason=StopReasonType.llm_api_error)
error_message = LettaErrorMessage(
run_id=run_id,
@@ -405,13 +408,14 @@ class StreamingService:
message="An error occurred with the LLM request.",
detail=str(e),
)
error_data = {"error": error_message.model_dump()}
logger.error(f"Run {run_id} stopped with LLM error: {e}, error_data: {error_message.model_dump()}")
yield f"data: {stop_reason.model_dump_json()}\n\n"
yield f"event: error\ndata: {error_message.model_dump_json()}\n\n"
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except Exception as e:
run_status = RunStatus.failed
logger.error(f"Run {run_id} stopped with unknown error: {e}, error_data: {error_data}")
stop_reason = LettaStopReason(stop_reason=StopReasonType.error)
error_message = LettaErrorMessage(
run_id=run_id,
@@ -419,6 +423,8 @@ class StreamingService:
message="An unknown error occurred with the LLM streaming request.",
detail=str(e),
)
error_data = {"error": error_message.model_dump()}
logger.error(f"Run {run_id} stopped with unknown error: {e}, error_data: {error_message.model_dump()}")
yield f"data: {stop_reason.model_dump_json()}\n\n"
yield f"event: error\ndata: {error_message.model_dump_json()}\n\n"
# Send [DONE] marker to properly close the stream

View File

@@ -1453,3 +1453,304 @@ class TestEdgeCases:
# Should still detect as cancelled
assert result_2.stop_reason.stop_reason == "cancelled"
class TestErrorDataPersistence:
"""
Test that error data is properly stored in run metadata when runs fail.
This ensures errors can be debugged by inspecting the run's metadata field.
"""
@pytest.mark.asyncio
async def test_error_data_stored_in_run_metadata_on_background_streaming_llm_error(
self,
server: SyncServer,
default_user,
test_agent_with_tool,
):
"""
Test that when a background streaming run fails due to an LLM error,
the error details are stored in the run's metadata field.
This test validates the fix for the issue where failed runs showed
empty metadata in the database, making it impossible to debug errors.
The test patches LettaAgentV3.stream to raise an LLMError, simulating
what happens when the LLM provider returns an error during streaming.
Verifies:
- Run status is set to 'failed'
- Run metadata contains 'error' key with error details
"""
from unittest.mock import patch
from letta.agents.letta_agent_v3 import LettaAgentV3
from letta.errors import LLMError
from letta.services.streaming_service import StreamingService
# Create streaming service
streaming_service = StreamingService(server)
# Create request with background streaming - NOT background for simplicity
# Background streaming adds Redis complexity, so we test foreground streaming
# which still exercises the same error handling in _create_error_aware_stream
request = LettaStreamingRequest(
messages=[MessageCreate(role=MessageRole.user, content="Hello, please respond")],
max_steps=1,
stream_tokens=True,
background=False,
)
# Mock stream method that raises an error
async def mock_stream_raises_llm_error(*args, **kwargs):
raise LLMError("Simulated LLM error for testing")
yield # Make it a generator
# Use patch to simulate the error during streaming
with patch.object(LettaAgentV3, "stream", mock_stream_raises_llm_error):
# Start the streaming request
run, stream_response = await streaming_service.create_agent_stream(
agent_id=test_agent_with_tool.id,
actor=default_user,
request=request,
run_type="test_error_persistence",
)
assert run is not None, "Run should be created"
# Consume the stream to trigger error handling
collected_chunks = []
async for chunk in stream_response.body_iterator:
collected_chunks.append(chunk)
# Give any async handling time to complete
await asyncio.sleep(0.2)
# Fetch the run from the database
fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user)
# Verify the run status is failed
assert fetched_run.status == RunStatus.failed, f"Expected status 'failed', got '{fetched_run.status}'"
# Verify metadata contains error information
assert fetched_run.metadata is not None, (
f"Run metadata should not be None after error. "
f"Run ID: {run.id}, Status: {fetched_run.status}, Stop reason: {fetched_run.stop_reason}"
)
assert "error" in fetched_run.metadata, f"Run metadata should contain 'error' key, got: {fetched_run.metadata}"
error_info = fetched_run.metadata["error"]
# The error is stored as a dict from LettaErrorMessage.model_dump()
assert isinstance(error_info, dict), f"Error info should be a dict, got: {type(error_info)}"
assert "error_type" in error_info, f"Error info should contain 'error_type', got: {error_info}"
assert error_info["error_type"] == "llm_error", f"Expected error_type 'llm_error', got: {error_info['error_type']}"
@pytest.mark.asyncio
async def test_error_data_stored_on_streaming_timeout_error(
self,
server: SyncServer,
default_user,
test_agent_with_tool,
):
"""
Test that timeout errors during streaming store error data.
Verifies:
- Timeout errors are properly captured in run metadata
- Run can be queried from DB and error details are available
"""
from unittest.mock import patch
from letta.agents.letta_agent_v3 import LettaAgentV3
from letta.errors import LLMTimeoutError
from letta.services.streaming_service import StreamingService
streaming_service = StreamingService(server)
request = LettaStreamingRequest(
messages=[MessageCreate(role=MessageRole.user, content="Hello")],
max_steps=1,
stream_tokens=True,
background=False,
)
async def mock_stream_raises_timeout(*args, **kwargs):
raise LLMTimeoutError("Request timed out after 30 seconds")
yield
with patch.object(LettaAgentV3, "stream", mock_stream_raises_timeout):
run, stream_response = await streaming_service.create_agent_stream(
agent_id=test_agent_with_tool.id,
actor=default_user,
request=request,
run_type="test_timeout_error",
)
# Consume the stream
async for _ in stream_response.body_iterator:
pass
await asyncio.sleep(0.2)
fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user)
assert fetched_run.status == RunStatus.failed
assert fetched_run.metadata is not None, f"Run metadata should contain error info for run {run.id}"
assert "error" in fetched_run.metadata
assert fetched_run.metadata["error"]["error_type"] == "llm_timeout"
@pytest.mark.asyncio
async def test_error_data_stored_on_streaming_rate_limit_error(
self,
server: SyncServer,
default_user,
test_agent_with_tool,
):
"""
Test that rate limit errors during streaming store error data.
Verifies:
- Rate limit errors are properly captured in run metadata
"""
from unittest.mock import patch
from letta.agents.letta_agent_v3 import LettaAgentV3
from letta.errors import LLMRateLimitError
from letta.services.streaming_service import StreamingService
streaming_service = StreamingService(server)
request = LettaStreamingRequest(
messages=[MessageCreate(role=MessageRole.user, content="Hello")],
max_steps=1,
stream_tokens=True,
background=False,
)
async def mock_stream_raises_rate_limit(*args, **kwargs):
raise LLMRateLimitError("Rate limit exceeded: 100 requests per minute")
yield
with patch.object(LettaAgentV3, "stream", mock_stream_raises_rate_limit):
run, stream_response = await streaming_service.create_agent_stream(
agent_id=test_agent_with_tool.id,
actor=default_user,
request=request,
run_type="test_rate_limit_error",
)
async for _ in stream_response.body_iterator:
pass
await asyncio.sleep(0.2)
fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user)
assert fetched_run.status == RunStatus.failed
assert fetched_run.metadata is not None
assert "error" in fetched_run.metadata
assert fetched_run.metadata["error"]["error_type"] == "llm_rate_limit"
@pytest.mark.asyncio
async def test_error_data_stored_on_streaming_auth_error(
self,
server: SyncServer,
default_user,
test_agent_with_tool,
):
"""
Test that authentication errors during streaming store error data.
Verifies:
- Auth errors are properly captured in run metadata
"""
from unittest.mock import patch
from letta.agents.letta_agent_v3 import LettaAgentV3
from letta.errors import LLMAuthenticationError
from letta.services.streaming_service import StreamingService
streaming_service = StreamingService(server)
request = LettaStreamingRequest(
messages=[MessageCreate(role=MessageRole.user, content="Hello")],
max_steps=1,
stream_tokens=True,
background=False,
)
async def mock_stream_raises_auth_error(*args, **kwargs):
raise LLMAuthenticationError("Invalid API key")
yield
with patch.object(LettaAgentV3, "stream", mock_stream_raises_auth_error):
run, stream_response = await streaming_service.create_agent_stream(
agent_id=test_agent_with_tool.id,
actor=default_user,
request=request,
run_type="test_auth_error",
)
async for _ in stream_response.body_iterator:
pass
await asyncio.sleep(0.2)
fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user)
assert fetched_run.status == RunStatus.failed
assert fetched_run.metadata is not None
assert "error" in fetched_run.metadata
assert fetched_run.metadata["error"]["error_type"] == "llm_authentication"
@pytest.mark.asyncio
async def test_error_data_stored_on_generic_exception(
self,
server: SyncServer,
default_user,
test_agent_with_tool,
):
"""
Test that generic exceptions during streaming store error data.
Verifies:
- Generic exceptions result in error data being stored
- Error details are preserved in metadata with 'internal_error' type
"""
from unittest.mock import patch
from letta.agents.letta_agent_v3 import LettaAgentV3
from letta.services.streaming_service import StreamingService
streaming_service = StreamingService(server)
request = LettaStreamingRequest(
messages=[MessageCreate(role=MessageRole.user, content="Hello")],
max_steps=1,
stream_tokens=True,
background=False,
)
async def mock_stream_raises_generic_error(*args, **kwargs):
raise RuntimeError("Unexpected internal error")
yield
with patch.object(LettaAgentV3, "stream", mock_stream_raises_generic_error):
run, stream_response = await streaming_service.create_agent_stream(
agent_id=test_agent_with_tool.id,
actor=default_user,
request=request,
run_type="test_generic_error",
)
async for _ in stream_response.body_iterator:
pass
await asyncio.sleep(0.2)
fetched_run = await server.run_manager.get_run_by_id(run.id, actor=default_user)
assert fetched_run.status == RunStatus.failed
assert fetched_run.metadata is not None, "Run metadata should contain error info for generic exception"
assert "error" in fetched_run.metadata
assert fetched_run.metadata["error"]["error_type"] == "internal_error"