feat: migrate sleeptime to new agent loop (#4485)
* feat: migrate sleeptime to new agent loop * only load agent state when needed
This commit is contained in:
@@ -213,7 +213,7 @@ class LettaAgentV2(BaseAgentV2):
|
||||
self,
|
||||
input_messages: list[MessageCreate],
|
||||
max_steps: int = DEFAULT_MAX_STEPS,
|
||||
stream_tokens: bool = True,
|
||||
stream_tokens: bool = False,
|
||||
run_id: str | None = None,
|
||||
use_assistant_message: bool = True,
|
||||
include_return_message_types: list[MessageType] | None = None,
|
||||
|
||||
225
letta/groups/sleeptime_multi_agent_v3.py
Normal file
225
letta/groups/sleeptime_multi_agent_v3.py
Normal file
@@ -0,0 +1,225 @@
|
||||
import asyncio
|
||||
from collections.abc import AsyncGenerator
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from letta.agents.letta_agent_v2 import LettaAgentV2
|
||||
from letta.constants import DEFAULT_MAX_STEPS
|
||||
from letta.groups.helpers import stringify_message
|
||||
from letta.schemas.agent import AgentState
|
||||
from letta.schemas.enums import JobStatus
|
||||
from letta.schemas.group import Group, ManagerType
|
||||
from letta.schemas.job import JobUpdate
|
||||
from letta.schemas.letta_message import MessageType
|
||||
from letta.schemas.letta_message_content import TextContent
|
||||
from letta.schemas.letta_response import LettaResponse
|
||||
from letta.schemas.message import Message, MessageCreate
|
||||
from letta.schemas.run import Run
|
||||
from letta.schemas.user import User
|
||||
from letta.services.group_manager import GroupManager
|
||||
|
||||
|
||||
class SleeptimeMultiAgentV3(LettaAgentV2):
|
||||
def __init__(
|
||||
self,
|
||||
agent_state: AgentState,
|
||||
actor: User,
|
||||
group: Group,
|
||||
):
|
||||
super().__init__(agent_state, actor)
|
||||
assert group.manager_type == ManagerType.sleeptime, f"Expected group type to be 'sleeptime', got {group.manager_type}"
|
||||
self.group = group
|
||||
self.run_ids = []
|
||||
|
||||
# Additional manager classes
|
||||
self.group_manager = GroupManager()
|
||||
|
||||
async def step(
|
||||
self,
|
||||
input_messages: list[MessageCreate],
|
||||
max_steps: int = DEFAULT_MAX_STEPS,
|
||||
run_id: str | None = None,
|
||||
use_assistant_message: bool = False,
|
||||
include_return_message_types: list[MessageType] | None = None,
|
||||
request_start_timestamp_ns: int | None = None,
|
||||
) -> LettaResponse:
|
||||
self.run_ids = []
|
||||
|
||||
for i in range(len(input_messages)):
|
||||
input_messages[i].group_id = self.group.id
|
||||
|
||||
response = await super().step(
|
||||
input_messages=input_messages,
|
||||
max_steps=max_steps,
|
||||
run_id=run_id,
|
||||
use_assistant_message=use_assistant_message,
|
||||
include_return_message_types=include_return_message_types,
|
||||
request_start_timestamp_ns=request_start_timestamp_ns,
|
||||
)
|
||||
|
||||
await self.run_sleeptime_agents(use_assistant_message=use_assistant_message)
|
||||
|
||||
response.usage.run_ids = self.run_ids
|
||||
return response
|
||||
|
||||
async def stream(
|
||||
self,
|
||||
input_messages: list[MessageCreate],
|
||||
max_steps: int = DEFAULT_MAX_STEPS,
|
||||
stream_tokens: bool = True,
|
||||
run_id: str | None = None,
|
||||
use_assistant_message: bool = True,
|
||||
request_start_timestamp_ns: int | None = None,
|
||||
include_return_message_types: list[MessageType] | None = None,
|
||||
) -> AsyncGenerator[str, None]:
|
||||
self.run_ids = []
|
||||
|
||||
for i in range(len(input_messages)):
|
||||
input_messages[i].group_id = self.group.id
|
||||
|
||||
# Perform foreground agent step
|
||||
async for chunk in super().stream(
|
||||
input_messages=input_messages,
|
||||
max_steps=max_steps,
|
||||
stream_tokens=stream_tokens,
|
||||
run_id=run_id,
|
||||
use_assistant_message=use_assistant_message,
|
||||
include_return_message_types=include_return_message_types,
|
||||
request_start_timestamp_ns=request_start_timestamp_ns,
|
||||
):
|
||||
yield chunk
|
||||
|
||||
await self.run_sleeptime_agents(use_assistant_message=use_assistant_message)
|
||||
|
||||
async def run_sleeptime_agents(self, use_assistant_message: bool = True):
|
||||
# Get response messages
|
||||
last_response_messages = self.response_messages
|
||||
|
||||
# Update turns counter
|
||||
turns_counter = None
|
||||
if self.group.sleeptime_agent_frequency is not None and self.group.sleeptime_agent_frequency > 0:
|
||||
turns_counter = await self.group_manager.bump_turns_counter_async(group_id=self.group.id, actor=self.actor)
|
||||
|
||||
# Perform participant steps
|
||||
if self.group.sleeptime_agent_frequency is None or (
|
||||
turns_counter is not None and turns_counter % self.group.sleeptime_agent_frequency == 0
|
||||
):
|
||||
last_processed_message_id = await self.group_manager.get_last_processed_message_id_and_update_async(
|
||||
group_id=self.group.id, last_processed_message_id=last_response_messages[-1].id, actor=self.actor
|
||||
)
|
||||
for sleeptime_agent_id in self.group.agent_ids:
|
||||
try:
|
||||
sleeptime_run_id = await self._issue_background_task(
|
||||
sleeptime_agent_id,
|
||||
last_response_messages,
|
||||
last_processed_message_id,
|
||||
use_assistant_message,
|
||||
)
|
||||
self.run_ids.append(sleeptime_run_id)
|
||||
except Exception as e:
|
||||
# Individual task failures
|
||||
print(f"Sleeptime agent processing failed: {e!s}")
|
||||
raise e
|
||||
|
||||
async def _issue_background_task(
|
||||
self,
|
||||
sleeptime_agent_id: str,
|
||||
response_messages: list[Message],
|
||||
last_processed_message_id: str,
|
||||
use_assistant_message: bool = True,
|
||||
) -> str:
|
||||
run = Run(
|
||||
user_id=self.actor.id,
|
||||
status=JobStatus.created,
|
||||
metadata={
|
||||
"job_type": "sleeptime_agent_send_message_async", # is this right?
|
||||
"agent_id": sleeptime_agent_id,
|
||||
},
|
||||
)
|
||||
run = await self.job_manager.create_job_async(pydantic_job=run, actor=self.actor)
|
||||
|
||||
asyncio.create_task(
|
||||
self._participant_agent_step(
|
||||
foreground_agent_id=self.agent_state.id,
|
||||
sleeptime_agent_id=sleeptime_agent_id,
|
||||
response_messages=response_messages,
|
||||
last_processed_message_id=last_processed_message_id,
|
||||
run_id=run.id,
|
||||
use_assistant_message=use_assistant_message,
|
||||
)
|
||||
)
|
||||
return run.id
|
||||
|
||||
async def _participant_agent_step(
|
||||
self,
|
||||
foreground_agent_id: str,
|
||||
sleeptime_agent_id: str,
|
||||
response_messages: list[Message],
|
||||
last_processed_message_id: str,
|
||||
run_id: str,
|
||||
use_assistant_message: bool = True,
|
||||
) -> LettaResponse:
|
||||
try:
|
||||
# Update job status
|
||||
job_update = JobUpdate(status=JobStatus.running)
|
||||
await self.job_manager.update_job_by_id_async(job_id=run_id, job_update=job_update, actor=self.actor)
|
||||
|
||||
# Create conversation transcript
|
||||
prior_messages = []
|
||||
if self.group.sleeptime_agent_frequency:
|
||||
try:
|
||||
prior_messages = await self.message_manager.list_messages_for_agent_async(
|
||||
agent_id=foreground_agent_id,
|
||||
actor=self.actor,
|
||||
after=last_processed_message_id,
|
||||
before=response_messages[0].id,
|
||||
)
|
||||
except Exception:
|
||||
pass # continue with just latest messages
|
||||
|
||||
transcript_summary = [stringify_message(message) for message in prior_messages + response_messages]
|
||||
transcript_summary = [summary for summary in transcript_summary if summary is not None]
|
||||
message_text = "\n".join(transcript_summary)
|
||||
|
||||
sleeptime_agent_messages = [
|
||||
MessageCreate(
|
||||
role="user",
|
||||
content=[TextContent(text=message_text)],
|
||||
id=Message.generate_id(),
|
||||
agent_id=sleeptime_agent_id,
|
||||
group_id=self.group.id,
|
||||
)
|
||||
]
|
||||
|
||||
# Load sleeptime agent
|
||||
sleeptime_agent_state = await self.agent_manager.get_agent_by_id_async(agent_id=sleeptime_agent_id)
|
||||
sleeptime_agent = LettaAgentV2(
|
||||
agent_state=sleeptime_agent_state,
|
||||
actor=self.actor,
|
||||
)
|
||||
|
||||
# Perform sleeptime agent step
|
||||
result = await sleeptime_agent.step(
|
||||
input_messages=sleeptime_agent_messages,
|
||||
run_id=run_id,
|
||||
use_assistant_message=use_assistant_message,
|
||||
)
|
||||
|
||||
# Update job status
|
||||
job_update = JobUpdate(
|
||||
status=JobStatus.completed,
|
||||
completed_at=datetime.now(timezone.utc).replace(tzinfo=None),
|
||||
metadata={
|
||||
"result": result.model_dump(mode="json"),
|
||||
"agent_id": sleeptime_agent.id,
|
||||
},
|
||||
)
|
||||
await self.job_manager.update_job_by_id_async(job_id=run_id, job_update=job_update, actor=self.actor)
|
||||
return result
|
||||
except Exception as e:
|
||||
job_update = JobUpdate(
|
||||
status=JobStatus.failed,
|
||||
completed_at=datetime.now(timezone.utc).replace(tzinfo=None),
|
||||
metadata={"error": str(e)},
|
||||
)
|
||||
await self.job_manager.update_job_by_id_async(job_id=run_id, job_update=job_update, actor=self.actor)
|
||||
raise
|
||||
@@ -1,85 +1,91 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import delete
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
from letta_client import Letta
|
||||
from letta_client.core.api_error import ApiError
|
||||
|
||||
from letta.config import LettaConfig
|
||||
from letta.constants import DEFAULT_HUMAN
|
||||
from letta.groups.sleeptime_multi_agent_v2 import SleeptimeMultiAgentV2
|
||||
from letta.orm import Provider, ProviderTrace, Step
|
||||
from letta.orm.errors import NoResultFound
|
||||
from letta.schemas.agent import CreateAgent
|
||||
from letta.schemas.block import CreateBlock
|
||||
from letta.schemas.enums import JobStatus, JobType, ToolRuleType
|
||||
from letta.schemas.group import GroupUpdate, ManagerType, SleeptimeManagerUpdate
|
||||
from letta.schemas.group import ManagerType, SleeptimeManagerUpdate
|
||||
from letta.schemas.message import MessageCreate
|
||||
from letta.schemas.run import Run
|
||||
from letta.server.db import db_registry
|
||||
from letta.server.server import SyncServer
|
||||
from letta.utils import get_human_text, get_persona_text
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def server():
|
||||
config = LettaConfig.load()
|
||||
print("CONFIG PATH", config.config_path)
|
||||
def server_url() -> str:
|
||||
"""
|
||||
Provides the URL for the Letta server.
|
||||
If LETTA_SERVER_URL is not set, starts the server in a background thread
|
||||
and polls until it's accepting connections.
|
||||
"""
|
||||
|
||||
config.save()
|
||||
def _run_server() -> None:
|
||||
load_dotenv()
|
||||
from letta.server.rest_api.app import start_server
|
||||
|
||||
server = SyncServer()
|
||||
return server
|
||||
start_server(debug=True)
|
||||
|
||||
url: str = os.getenv("LETTA_SERVER_URL", "http://localhost:8283")
|
||||
|
||||
if not os.getenv("LETTA_SERVER_URL"):
|
||||
thread = threading.Thread(target=_run_server, daemon=True)
|
||||
thread.start()
|
||||
|
||||
# Poll until the server is up (or timeout)
|
||||
timeout_seconds = 30
|
||||
deadline = time.time() + timeout_seconds
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
resp = requests.get(url + "/v1/health")
|
||||
if resp.status_code < 500:
|
||||
break
|
||||
except requests.exceptions.RequestException:
|
||||
pass
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
raise RuntimeError(f"Could not reach {url} within {timeout_seconds}s")
|
||||
|
||||
return url
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def org_id(server):
|
||||
org = server.organization_manager.create_default_organization()
|
||||
|
||||
yield org.id
|
||||
|
||||
# cleanup
|
||||
with db_registry.session() as session:
|
||||
session.execute(delete(ProviderTrace))
|
||||
session.execute(delete(Step))
|
||||
session.execute(delete(Provider))
|
||||
session.commit()
|
||||
server.organization_manager.delete_organization_by_id(org.id)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def actor(server, org_id):
|
||||
user = server.user_manager.create_default_user()
|
||||
yield user
|
||||
|
||||
# cleanup
|
||||
server.user_manager.delete_user_by_id(user.id)
|
||||
def client(server_url: str) -> Letta:
|
||||
"""
|
||||
Creates and returns a synchronous Letta REST client for testing.
|
||||
"""
|
||||
client_instance = Letta(base_url=server_url)
|
||||
yield client_instance
|
||||
|
||||
|
||||
@pytest.mark.flaky(max_runs=3)
|
||||
@pytest.mark.asyncio(loop_scope="module")
|
||||
async def test_sleeptime_group_chat(server, actor):
|
||||
async def test_sleeptime_group_chat(client):
|
||||
# 0. Refresh base tools
|
||||
server.tool_manager.upsert_base_tools(actor=actor)
|
||||
client.tools.upsert_base_tools()
|
||||
|
||||
# 1. Create sleeptime agent
|
||||
main_agent = server.create_agent(
|
||||
request=CreateAgent(
|
||||
name="main_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="persona",
|
||||
value="You are a personal assistant that helps users with requests.",
|
||||
),
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value="My favorite plant is the fiddle leaf\nMy favorite color is lavender",
|
||||
),
|
||||
],
|
||||
# model="openai/gpt-4o-mini",
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
),
|
||||
actor=actor,
|
||||
main_agent = client.agents.create(
|
||||
name="main_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="persona",
|
||||
value="You are a personal assistant that helps users with requests.",
|
||||
),
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value="My favorite plant is the fiddle leaf\nMy favorite color is lavender",
|
||||
),
|
||||
],
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
)
|
||||
|
||||
assert main_agent.enable_sleeptime == True
|
||||
@@ -89,14 +95,11 @@ async def test_sleeptime_group_chat(server, actor):
|
||||
assert "archival_memory_insert" not in main_agent_tools
|
||||
|
||||
# 2. Override frequency for test
|
||||
group = await server.group_manager.modify_group_async(
|
||||
group = client.groups.modify(
|
||||
group_id=main_agent.multi_agent_group.id,
|
||||
group_update=GroupUpdate(
|
||||
manager_config=SleeptimeManagerUpdate(
|
||||
sleeptime_agent_frequency=2,
|
||||
),
|
||||
manager_config=SleeptimeManagerUpdate(
|
||||
sleeptime_agent_frequency=2,
|
||||
),
|
||||
actor=actor,
|
||||
)
|
||||
|
||||
assert group.manager_type == ManagerType.sleeptime
|
||||
@@ -105,14 +108,14 @@ async def test_sleeptime_group_chat(server, actor):
|
||||
|
||||
# 3. Verify shared blocks
|
||||
sleeptime_agent_id = group.agent_ids[0]
|
||||
shared_block = server.agent_manager.get_block_with_label(agent_id=main_agent.id, block_label="human", actor=actor)
|
||||
agents = await server.block_manager.get_agents_for_block_async(block_id=shared_block.id, actor=actor)
|
||||
shared_block = client.agents.blocks.retrieve(agent_id=main_agent.id, block_label="human")
|
||||
agents = client.blocks.agents.list(block_id=shared_block.id)
|
||||
assert len(agents) == 2
|
||||
assert sleeptime_agent_id in [agent.id for agent in agents]
|
||||
assert main_agent.id in [agent.id for agent in agents]
|
||||
|
||||
# 4 Verify sleeptime agent tools
|
||||
sleeptime_agent = server.agent_manager.get_agent_by_id(agent_id=sleeptime_agent_id, actor=actor)
|
||||
sleeptime_agent = client.agents.retrieve(agent_id=sleeptime_agent_id)
|
||||
sleeptime_agent_tools = [tool.name for tool in sleeptime_agent.tools]
|
||||
assert "memory_rethink" in sleeptime_agent_tools
|
||||
assert "memory_finish_edits" in sleeptime_agent_tools
|
||||
@@ -132,136 +135,9 @@ async def test_sleeptime_group_chat(server, actor):
|
||||
]
|
||||
run_ids = []
|
||||
for i, text in enumerate(message_text):
|
||||
response = await server.send_message_to_agent(
|
||||
response = client.agents.messages.create(
|
||||
agent_id=main_agent.id,
|
||||
actor=actor,
|
||||
input_messages=[
|
||||
MessageCreate(
|
||||
role="user",
|
||||
content=text,
|
||||
),
|
||||
],
|
||||
stream_steps=False,
|
||||
stream_tokens=False,
|
||||
)
|
||||
|
||||
assert len(response.messages) > 0
|
||||
assert len(response.usage.run_ids or []) == (i + 1) % 2
|
||||
run_ids.extend(response.usage.run_ids or [])
|
||||
|
||||
jobs = server.job_manager.list_jobs(actor=actor, job_type=JobType.RUN)
|
||||
runs = [Run.from_job(job) for job in jobs]
|
||||
agent_runs = [run for run in runs if "agent_id" in run.metadata and run.metadata["agent_id"] == sleeptime_agent_id]
|
||||
assert len(agent_runs) == len(run_ids)
|
||||
|
||||
# 6. Verify run status after sleep
|
||||
time.sleep(2)
|
||||
for run_id in run_ids:
|
||||
job = server.job_manager.get_job_by_id(job_id=run_id, actor=actor)
|
||||
assert job.status == JobStatus.running or job.status == JobStatus.completed
|
||||
|
||||
# 7. Delete agent
|
||||
server.agent_manager.delete_agent(agent_id=main_agent.id, actor=actor)
|
||||
|
||||
with pytest.raises(NoResultFound):
|
||||
server.group_manager.retrieve_group(group_id=group.id, actor=actor)
|
||||
with pytest.raises(NoResultFound):
|
||||
server.agent_manager.get_agent_by_id(agent_id=sleeptime_agent_id, actor=actor)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="module")
|
||||
async def test_sleeptime_group_chat_v2(server, actor):
|
||||
# 0. Refresh base tools
|
||||
server.tool_manager.upsert_base_tools(actor=actor)
|
||||
|
||||
# 1. Create sleeptime agent
|
||||
main_agent = server.create_agent(
|
||||
request=CreateAgent(
|
||||
name="main_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="persona",
|
||||
value="You are a personal assistant that helps users with requests.",
|
||||
),
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value="My favorite plant is the fiddle leaf\nMy favorite color is lavender",
|
||||
),
|
||||
],
|
||||
# model="openai/gpt-4o-mini",
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
include_base_tool_rules=True,
|
||||
),
|
||||
actor=actor,
|
||||
)
|
||||
|
||||
assert main_agent.enable_sleeptime == True
|
||||
main_agent_tools = [tool.name for tool in main_agent.tools]
|
||||
assert "core_memory_append" not in main_agent_tools
|
||||
assert "core_memory_replace" not in main_agent_tools
|
||||
assert "archival_memory_insert" not in main_agent_tools
|
||||
|
||||
# 2. Override frequency for test
|
||||
group = await server.group_manager.modify_group_async(
|
||||
group_id=main_agent.multi_agent_group.id,
|
||||
group_update=GroupUpdate(
|
||||
manager_config=SleeptimeManagerUpdate(
|
||||
sleeptime_agent_frequency=2,
|
||||
),
|
||||
),
|
||||
actor=actor,
|
||||
)
|
||||
|
||||
assert group.manager_type == ManagerType.sleeptime
|
||||
assert group.sleeptime_agent_frequency == 2
|
||||
assert len(group.agent_ids) == 1
|
||||
|
||||
# 3. Verify shared blocks
|
||||
sleeptime_agent_id = group.agent_ids[0]
|
||||
shared_block = server.agent_manager.get_block_with_label(agent_id=main_agent.id, block_label="human", actor=actor)
|
||||
agents = await server.block_manager.get_agents_for_block_async(block_id=shared_block.id, actor=actor)
|
||||
assert len(agents) == 2
|
||||
assert sleeptime_agent_id in [agent.id for agent in agents]
|
||||
assert main_agent.id in [agent.id for agent in agents]
|
||||
|
||||
# 4 Verify sleeptime agent tools
|
||||
sleeptime_agent = server.agent_manager.get_agent_by_id(agent_id=sleeptime_agent_id, actor=actor)
|
||||
sleeptime_agent_tools = [tool.name for tool in sleeptime_agent.tools]
|
||||
assert "memory_rethink" in sleeptime_agent_tools
|
||||
assert "memory_finish_edits" in sleeptime_agent_tools
|
||||
assert "memory_replace" in sleeptime_agent_tools
|
||||
assert "memory_insert" in sleeptime_agent_tools
|
||||
|
||||
assert len([rule for rule in sleeptime_agent.tool_rules if rule.type == ToolRuleType.exit_loop]) > 0
|
||||
|
||||
# 5. Send messages and verify run ids
|
||||
message_text = [
|
||||
"my favorite color is orange",
|
||||
"not particularly. today is a good day",
|
||||
"actually my favorite color is coral",
|
||||
"let's change the subject",
|
||||
"actually my fav plant is the the african spear",
|
||||
"indeed",
|
||||
]
|
||||
run_ids = []
|
||||
for i, text in enumerate(message_text):
|
||||
agent = SleeptimeMultiAgentV2(
|
||||
agent_id=main_agent.id,
|
||||
message_manager=server.message_manager,
|
||||
agent_manager=server.agent_manager,
|
||||
block_manager=server.block_manager,
|
||||
passage_manager=server.passage_manager,
|
||||
group_manager=server.group_manager,
|
||||
job_manager=server.job_manager,
|
||||
actor=actor,
|
||||
group=main_agent.multi_agent_group,
|
||||
step_manager=server.step_manager,
|
||||
)
|
||||
|
||||
response = await agent.step(
|
||||
input_messages=[
|
||||
messages=[
|
||||
MessageCreate(
|
||||
role="user",
|
||||
content=text,
|
||||
@@ -273,172 +149,153 @@ async def test_sleeptime_group_chat_v2(server, actor):
|
||||
assert len(response.usage.run_ids or []) == (i + 1) % 2
|
||||
run_ids.extend(response.usage.run_ids or [])
|
||||
|
||||
jobs = server.job_manager.list_jobs(actor=actor, job_type=JobType.RUN)
|
||||
runs = [Run.from_job(job) for job in jobs]
|
||||
runs = client.runs.list()
|
||||
agent_runs = [run for run in runs if "agent_id" in run.metadata and run.metadata["agent_id"] == sleeptime_agent_id]
|
||||
assert len(agent_runs) == len(run_ids)
|
||||
|
||||
# 6. Verify run status after sleep
|
||||
time.sleep(2)
|
||||
for run_id in run_ids:
|
||||
job = server.job_manager.get_job_by_id(job_id=run_id, actor=actor)
|
||||
job = client.runs.retrieve(run_id=run_id)
|
||||
assert job.status == JobStatus.running or job.status == JobStatus.completed
|
||||
|
||||
# 7. Delete agent
|
||||
server.agent_manager.delete_agent(agent_id=main_agent.id, actor=actor)
|
||||
client.agents.delete(agent_id=main_agent.id)
|
||||
|
||||
with pytest.raises(NoResultFound):
|
||||
server.group_manager.retrieve_group(group_id=group.id, actor=actor)
|
||||
with pytest.raises(NoResultFound):
|
||||
server.agent_manager.get_agent_by_id(agent_id=sleeptime_agent_id, actor=actor)
|
||||
with pytest.raises(ApiError):
|
||||
client.groups.retrieve(group_id=group.id)
|
||||
with pytest.raises(ApiError):
|
||||
client.agents.retrieve(agent_id=sleeptime_agent_id)
|
||||
|
||||
|
||||
@pytest.mark.skip
|
||||
@pytest.mark.asyncio(loop_scope="module")
|
||||
async def test_sleeptime_removes_redundant_information(server, actor):
|
||||
async def test_sleeptime_removes_redundant_information(client):
|
||||
# 1. set up sleep-time agent as in test_sleeptime_group_chat
|
||||
server.tool_manager.upsert_base_tools(actor=actor)
|
||||
main_agent = server.create_agent(
|
||||
request=CreateAgent(
|
||||
name="main_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="persona",
|
||||
value="You are a personal assistant that helps users with requests.",
|
||||
),
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value="My favorite plant is the fiddle leaf\nMy favorite dog is the husky\nMy favorite plant is the fiddle leaf\nMy favorite plant is the fiddle leaf",
|
||||
),
|
||||
],
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
),
|
||||
actor=actor,
|
||||
client.tools.upsert_base_tools()
|
||||
main_agent = client.agents.create(
|
||||
name="main_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="persona",
|
||||
value="You are a personal assistant that helps users with requests.",
|
||||
),
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value="My favorite plant is the fiddle leaf\nMy favorite dog is the husky\nMy favorite plant is the fiddle leaf\nMy favorite plant is the fiddle leaf",
|
||||
),
|
||||
],
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
)
|
||||
|
||||
group = await server.group_manager.modify_group_async(
|
||||
group = client.groups.modify(
|
||||
group_id=main_agent.multi_agent_group.id,
|
||||
group_update=GroupUpdate(
|
||||
manager_config=SleeptimeManagerUpdate(
|
||||
sleeptime_agent_frequency=1,
|
||||
),
|
||||
manager_config=SleeptimeManagerUpdate(
|
||||
sleeptime_agent_frequency=1,
|
||||
),
|
||||
actor=actor,
|
||||
)
|
||||
sleeptime_agent_id = group.agent_ids[0]
|
||||
shared_block = server.agent_manager.get_block_with_label(agent_id=main_agent.id, block_label="human", actor=actor)
|
||||
shared_block = client.agents.blocks.retrieve(agent_id=main_agent.id, block_label="human")
|
||||
count_before_memory_edits = shared_block.value.count("fiddle leaf")
|
||||
test_messages = ["hello there", "my favorite bird is the sparrow"]
|
||||
|
||||
for test_message in test_messages:
|
||||
_ = await server.send_message_to_agent(
|
||||
_ = client.agents.messages.create(
|
||||
agent_id=main_agent.id,
|
||||
actor=actor,
|
||||
input_messages=[
|
||||
messages=[
|
||||
MessageCreate(
|
||||
role="user",
|
||||
content=test_message,
|
||||
),
|
||||
],
|
||||
stream_steps=False,
|
||||
stream_tokens=False,
|
||||
)
|
||||
# 2. Allow memory blocks time to update
|
||||
time.sleep(5)
|
||||
|
||||
# 3. Check that the memory blocks have been collapsed
|
||||
shared_block = server.agent_manager.get_block_with_label(agent_id=main_agent.id, block_label="human", actor=actor)
|
||||
shared_block = client.agents.blocks.retrieve(agent_id=main_agent.id, block_label="human")
|
||||
count_after_memory_edits = shared_block.value.count("fiddle leaf")
|
||||
assert count_after_memory_edits < count_before_memory_edits
|
||||
|
||||
# 4. Delete agent
|
||||
server.agent_manager.delete_agent(agent_id=main_agent.id, actor=actor)
|
||||
client.agents.delete(agent_id=main_agent.id)
|
||||
|
||||
with pytest.raises(NoResultFound):
|
||||
server.group_manager.retrieve_group(group_id=group.id, actor=actor)
|
||||
with pytest.raises(NoResultFound):
|
||||
server.agent_manager.get_agent_by_id(agent_id=sleeptime_agent_id, actor=actor)
|
||||
with pytest.raises(ApiError):
|
||||
client.groups.retrieve(group_id=group.id)
|
||||
with pytest.raises(ApiError):
|
||||
client.agents.retrieve(agent_id=sleeptime_agent_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="module")
|
||||
async def test_sleeptime_edit(server, actor):
|
||||
sleeptime_agent = server.create_agent(
|
||||
request=CreateAgent(
|
||||
name="sleeptime_agent",
|
||||
agent_type="sleeptime_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value=get_human_text(DEFAULT_HUMAN),
|
||||
limit=2000,
|
||||
),
|
||||
CreateBlock(
|
||||
label="memory_persona",
|
||||
value=get_persona_text("sleeptime_memory_persona"),
|
||||
limit=2000,
|
||||
),
|
||||
CreateBlock(
|
||||
label="fact_block",
|
||||
value="""Messi resides in the Paris.
|
||||
Messi plays in the league Ligue 1.
|
||||
Messi plays for the team Paris Saint-Germain.
|
||||
The national team Messi plays for is the Argentina team.
|
||||
Messi is also known as Leo Messi
|
||||
Victor Ulloa plays for Inter Miami""",
|
||||
limit=2000,
|
||||
),
|
||||
],
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
),
|
||||
actor=actor,
|
||||
async def test_sleeptime_edit(client):
|
||||
sleeptime_agent = client.agents.create(
|
||||
name="sleeptime_agent",
|
||||
agent_type="sleeptime_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value=get_human_text(DEFAULT_HUMAN),
|
||||
limit=2000,
|
||||
),
|
||||
CreateBlock(
|
||||
label="memory_persona",
|
||||
value=get_persona_text("sleeptime_memory_persona"),
|
||||
limit=2000,
|
||||
),
|
||||
CreateBlock(
|
||||
label="fact_block",
|
||||
value="""Messi resides in the Paris.
|
||||
Messi plays in the league Ligue 1.
|
||||
Messi plays for the team Paris Saint-Germain.
|
||||
The national team Messi plays for is the Argentina team.
|
||||
Messi is also known as Leo Messi
|
||||
Victor Ulloa plays for Inter Miami""",
|
||||
limit=2000,
|
||||
),
|
||||
],
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
)
|
||||
|
||||
_ = await server.send_message_to_agent(
|
||||
_ = client.agents.messages.create(
|
||||
agent_id=sleeptime_agent.id,
|
||||
actor=actor,
|
||||
input_messages=[
|
||||
messages=[
|
||||
MessageCreate(
|
||||
role="user",
|
||||
content="Messi has now moved to playing for Inter Miami",
|
||||
),
|
||||
],
|
||||
stream_steps=False,
|
||||
stream_tokens=False,
|
||||
)
|
||||
fact_block = server.agent_manager.get_block_with_label(agent_id=sleeptime_agent.id, block_label="fact_block", actor=actor)
|
||||
fact_block = client.agents.blocks.retrieve(agent_id=sleeptime_agent.id, block_label="fact_block")
|
||||
print(fact_block.value)
|
||||
assert fact_block.value.count("Inter Miami") > 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="module")
|
||||
async def test_sleeptime_agent_new_block_attachment(server, actor):
|
||||
async def test_sleeptime_agent_new_block_attachment(client):
|
||||
"""Test that a new block created after agent creation is properly attached to both main and sleeptime agents."""
|
||||
# 0. Refresh base tools
|
||||
server.tool_manager.upsert_base_tools(actor=actor)
|
||||
client.tools.upsert_base_tools()
|
||||
|
||||
# 1. Create sleeptime agent
|
||||
main_agent = server.create_agent(
|
||||
request=CreateAgent(
|
||||
name="main_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="persona",
|
||||
value="You are a personal assistant that helps users with requests.",
|
||||
),
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value="My favorite plant is the fiddle leaf\nMy favorite color is lavender",
|
||||
),
|
||||
],
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
),
|
||||
actor=actor,
|
||||
main_agent = client.agents.create(
|
||||
name="main_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(
|
||||
label="persona",
|
||||
value="You are a personal assistant that helps users with requests.",
|
||||
),
|
||||
CreateBlock(
|
||||
label="human",
|
||||
value="My favorite plant is the fiddle leaf\nMy favorite color is lavender",
|
||||
),
|
||||
],
|
||||
model="anthropic/claude-3-5-sonnet-20240620",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
enable_sleeptime=True,
|
||||
)
|
||||
|
||||
assert main_agent.enable_sleeptime == True
|
||||
@@ -448,13 +305,13 @@ async def test_sleeptime_agent_new_block_attachment(server, actor):
|
||||
sleeptime_agent_id = group.agent_ids[0]
|
||||
|
||||
# 3. Verify initial shared blocks
|
||||
main_agent_refreshed = server.agent_manager.get_agent_by_id(agent_id=main_agent.id, actor=actor)
|
||||
main_agent_refreshed = client.agents.retrieve(agent_id=main_agent.id)
|
||||
initial_blocks = main_agent_refreshed.memory.blocks
|
||||
initial_block_count = len(initial_blocks)
|
||||
|
||||
# Verify both agents share the initial blocks
|
||||
for block in initial_blocks:
|
||||
agents = await server.block_manager.get_agents_for_block_async(block_id=block.id, actor=actor)
|
||||
agents = client.blocks.agents.list(block_id=block.id)
|
||||
assert len(agents) == 2
|
||||
assert sleeptime_agent_id in [agent.id for agent in agents]
|
||||
assert main_agent.id in [agent.id for agent in agents]
|
||||
@@ -462,26 +319,23 @@ async def test_sleeptime_agent_new_block_attachment(server, actor):
|
||||
# 4. Create a new block after agent creation
|
||||
from letta.schemas.block import Block as PydanticBlock
|
||||
|
||||
new_block = server.block_manager.create_or_update_block(
|
||||
PydanticBlock(
|
||||
label="preferences",
|
||||
value="My favorite season is autumn\nI prefer tea over coffee",
|
||||
),
|
||||
actor=actor,
|
||||
new_block = client.blocks.create(
|
||||
label="preferences",
|
||||
value="My favorite season is autumn\nI prefer tea over coffee",
|
||||
)
|
||||
|
||||
# 5. Attach the new block to the main agent
|
||||
server.agent_manager.attach_block(agent_id=main_agent.id, block_id=new_block.id, actor=actor)
|
||||
client.agents.blocks.attach(agent_id=main_agent.id, block_id=new_block.id)
|
||||
|
||||
# 6. Verify the new block is attached to the main agent
|
||||
main_agent_refreshed = server.agent_manager.get_agent_by_id(agent_id=main_agent.id, actor=actor)
|
||||
main_agent_refreshed = client.agents.retrieve(agent_id=main_agent.id)
|
||||
main_agent_blocks = main_agent_refreshed.memory.blocks
|
||||
assert len(main_agent_blocks) == initial_block_count + 1
|
||||
main_agent_block_ids = [block.id for block in main_agent_blocks]
|
||||
assert new_block.id in main_agent_block_ids
|
||||
|
||||
# 7. Check if the new block is also attached to the sleeptime agent (this is where the bug might be)
|
||||
sleeptime_agent = server.agent_manager.get_agent_by_id(agent_id=sleeptime_agent_id, actor=actor)
|
||||
sleeptime_agent = client.agents.retrieve(agent_id=sleeptime_agent_id)
|
||||
sleeptime_agent_blocks = sleeptime_agent.memory.blocks
|
||||
sleeptime_agent_block_ids = [block.id for block in sleeptime_agent_blocks]
|
||||
|
||||
@@ -489,7 +343,7 @@ async def test_sleeptime_agent_new_block_attachment(server, actor):
|
||||
assert new_block.id in sleeptime_agent_block_ids, f"New block {new_block.id} not attached to sleeptime agent {sleeptime_agent_id}"
|
||||
|
||||
# 8. Verify that agents sharing the new block include both main and sleeptime agents
|
||||
agents_with_new_block = await server.block_manager.get_agents_for_block_async(block_id=new_block.id, actor=actor)
|
||||
agents_with_new_block = client.blocks.agents.list(block_id=new_block.id)
|
||||
agent_ids_with_new_block = [agent.id for agent in agents_with_new_block]
|
||||
|
||||
assert main_agent.id in agent_ids_with_new_block, "Main agent should have access to the new block"
|
||||
@@ -497,4 +351,4 @@ async def test_sleeptime_agent_new_block_attachment(server, actor):
|
||||
assert len(agents_with_new_block) == 2, "Both main and sleeptime agents should share the new block"
|
||||
|
||||
# 9. Clean up
|
||||
server.agent_manager.delete_agent(agent_id=main_agent.id, actor=actor)
|
||||
client.agents.delete(agent_id=main_agent.id)
|
||||
|
||||
Reference in New Issue
Block a user