feat: trigger sleeptime doc processing (#1715)

Co-authored-by: Caren Thomas
Co-authored-by: Kevin Lin <kl2806@columbia.edu>
Co-authored-by: Kevin Lin <klin5061@gmail.com>
This commit is contained in:
cthomas
2025-04-15 15:31:12 -07:00
committed by GitHub
parent 90f5889404
commit d572f04285
2 changed files with 64 additions and 10 deletions

View File

@@ -34,8 +34,9 @@ from letta.interface import AgentInterface # abstract
from letta.interface import CLIInterface # for printing to terminal
from letta.log import get_logger
from letta.orm.errors import NoResultFound
from letta.prompts.gpt_system import get_system_text
from letta.schemas.agent import AgentState, AgentType, CreateAgent, UpdateAgent
from letta.schemas.block import BlockUpdate, CreateBlock
from letta.schemas.block import Block, BlockUpdate, CreateBlock
from letta.schemas.embedding_config import EmbeddingConfig
# openai schemas
@@ -996,12 +997,6 @@ class SyncServer(Server):
connector = DirectoryConnector(input_files=[file_path])
num_passages, num_documents = self.load_data(user_id=source.created_by_id, source_name=source.name, connector=connector)
# update job status
job.status = JobStatus.completed
job.metadata["num_passages"] = num_passages
job.metadata["num_documents"] = num_documents
self.job_manager.update_job_by_id(job_id=job_id, job_update=JobUpdate(**job.model_dump()), actor=actor)
# update all agents who have this source attached
agent_states = self.source_manager.list_attached_agents(source_id=source_id, actor=actor)
for agent_state in agent_states:
@@ -1009,15 +1004,74 @@ class SyncServer(Server):
# Attach source to agent
curr_passage_size = self.agent_manager.passage_size(actor=actor, agent_id=agent_id)
self.agent_manager.attach_source(agent_id=agent_state.id, source_id=source_id, actor=actor)
agent_state = self.agent_manager.attach_source(agent_id=agent_state.id, source_id=source_id, actor=actor)
new_passage_size = self.agent_manager.passage_size(actor=actor, agent_id=agent_id)
assert new_passage_size >= curr_passage_size # in case empty files are added
# Process file via sleeptime agent
if agent_state.enable_sleeptime:
ephemeral_sleeptime_agent = self.create_document_sleeptime_agent(
main_agent=agent_state, source_name=source.name, actor=actor
)
agent = self.load_agent(agent_id=ephemeral_sleeptime_agent.id, actor=actor)
for passage in self.list_data_source_passages(source_id=source_id, user_id=actor.id):
agent.step(
messages=[
Message(
role="user",
content=[TextContent(text=passage.text)],
agent_id=ephemeral_sleeptime_agent.id,
),
]
)
self.agent_manager.delete_agent(agent_id=ephemeral_sleeptime_agent.id, actor=actor)
# rebuild system prompt and force
self.agent_manager.rebuild_system_prompt(agent_id=agent_id, actor=actor, force=True)
agent_state = self.agent_manager.rebuild_system_prompt(agent_id=agent_id, actor=actor, force=True)
# update job status
job.status = JobStatus.completed
job.metadata["num_passages"] = num_passages
job.metadata["num_documents"] = num_documents
self.job_manager.update_job_by_id(job_id=job_id, job_update=JobUpdate(**job.model_dump()), actor=actor)
return job
def create_document_sleeptime_agent(self, main_agent: AgentState, source_name: str, actor: User) -> AgentState:
try:
block = self.agent_manager.get_block_with_label(agent_id=main_agent.id, block_label=source_name, actor=actor)
except:
block = self.block_manager.create_or_update_block(Block(label=source_name, value=""), actor=actor)
self.agent_manager.attach_block(agent_id=main_agent.id, block_id=block.id, actor=actor)
request = CreateAgent(
name=main_agent.name + "-doc-sleeptime",
system=get_system_text("sleeptime_doc_ingest"),
agent_type=AgentType.sleeptime_agent,
block_ids=[block.id],
memory_blocks=[
CreateBlock(
label="persona",
value=(
"I am an expert document summarizer. "
"I manage the data source blocks such that they "
"contain everything that is important about "
"the corresponding files."
),
),
],
llm_config=main_agent.llm_config,
embedding_config=main_agent.embedding_config,
project_id=main_agent.project_id,
include_base_tools=False,
tools=["core_memory_insert", "rethink_memory", "finish_rethinking_memory", "view_core_memory_with_line_numbers"],
)
return self.agent_manager.create_agent(
agent_create=request,
actor=actor,
)
def load_data(
self,
user_id: str,