fix: resource cleanup on background tasks (#5738)
copy over Jake's PR from OSS
This commit is contained in:
@@ -13,6 +13,8 @@ from sqlalchemy.exc import IntegrityError, OperationalError
|
||||
from starlette.responses import Response, StreamingResponse
|
||||
|
||||
from letta.agents.agent_loop import AgentLoop
|
||||
from letta.agents.base_agent_v2 import BaseAgentV2
|
||||
from letta.agents.letta_agent import LettaAgent
|
||||
from letta.agents.letta_agent_v2 import LettaAgentV2
|
||||
from letta.constants import DEFAULT_MAX_STEPS, DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG, REDIS_RUN_ID_PREFIX
|
||||
from letta.data_sources.redis_client import get_redis_client
|
||||
@@ -1529,6 +1531,9 @@ async def _process_message_background(
|
||||
) -> None:
|
||||
"""Background task to process the message and update run status."""
|
||||
request_start_timestamp_ns = get_utc_timestamp_ns()
|
||||
agent_loop = None
|
||||
result = None
|
||||
|
||||
try:
|
||||
agent = await server.agent_manager.get_agent_by_id_async(
|
||||
agent_id, actor, include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools"]
|
||||
@@ -1606,6 +1611,41 @@ async def _process_message_background(
|
||||
update=RunUpdate(status=RunStatus.failed),
|
||||
actor=actor,
|
||||
)
|
||||
finally:
|
||||
# Critical: Explicit resource cleanup to prevent accumulation
|
||||
if agent_loop and result:
|
||||
await _cleanup_background_task_resources(agent_loop, result)
|
||||
|
||||
|
||||
async def _cleanup_background_task_resources(agent_loop: BaseAgentV2 | LettaAgent, result: StreamingResponse | LettaResponse) -> None:
|
||||
"""
|
||||
Explicit cleanup of resources created during background message processing.
|
||||
|
||||
Proper cleanup of:
|
||||
- Agent instances and their internal state
|
||||
- Message buffers and response accumulation
|
||||
- Any database connections or sessions
|
||||
- LLM client resources
|
||||
"""
|
||||
import gc
|
||||
|
||||
try:
|
||||
if agent_loop is not None:
|
||||
if agent_loop.response_messages:
|
||||
# Clear response message buffer to prevent accumulation
|
||||
agent_loop.response_messages.clear()
|
||||
# Clean up agent loop resources
|
||||
del agent_loop
|
||||
|
||||
if result is not None:
|
||||
del result # Clear result data to free memory
|
||||
|
||||
# Force garbage collection to clean up references and release memory
|
||||
gc.collect()
|
||||
except Exception as e:
|
||||
# Handle errors for logging but don't fail the background task
|
||||
logger.warning(f"Error during background task resource cleanup: {e}")
|
||||
pass
|
||||
|
||||
|
||||
@router.post(
|
||||
|
||||
Reference in New Issue
Block a user