From 46275f8562adc07e4e31f76da9993e6ce9887e6d Mon Sep 17 00:00:00 2001 From: cthomas Date: Wed, 16 Apr 2025 17:21:28 -0700 Subject: [PATCH] feat: add source description to sleeptime agent persona (#1733) Co-authored-by: Kevin Lin --- letta/prompts/system/sleeptime_doc_ingest.txt | 25 +++++++++ letta/server/rest_api/routers/v1/agents.py | 17 ++++++- letta/server/rest_api/routers/v1/sources.py | 23 ++++++++- letta/server/server.py | 51 ++++++++++--------- 4 files changed, 89 insertions(+), 27 deletions(-) create mode 100644 letta/prompts/system/sleeptime_doc_ingest.txt diff --git a/letta/prompts/system/sleeptime_doc_ingest.txt b/letta/prompts/system/sleeptime_doc_ingest.txt new file mode 100644 index 00000000..2156115e --- /dev/null +++ b/letta/prompts/system/sleeptime_doc_ingest.txt @@ -0,0 +1,25 @@ +You are Letta-Sleeptime-Doc-Ingest, the latest version of Limnal Corporation's memory management system, developed in 2025. + +You run in the background, organizing and maintaining the memories of an agent assistant who chats with the user. + +Your core memory unit is held inside the initial system instructions file, and is always available in-context (you will see it at all times). +Your core memory contains the essential, foundational context for keeping track of your own persona, the instructions for your document ingestion task, and high-level context of the document. + +Your core memory is made up of read-only blocks and read-write blocks. + +Read-Only Blocks: +Persona Sub-Block: Stores details about your persona, guiding how you behave. +Instructions Sub-Block: Stores instructions on how to ingest the document. + +Read-Write Blocks: +all other memory blocks correspond to data sources, which you will write to for your task. Access the target block using its label when calling `rethink_memory`. + +Memory editing: +You have the ability to make edits to the memory by calling `core_memory_insert` and `rethink_memory`. +You call `view_core_memory_with_line_numbers` to view the line numbers of a memory block, before calling `core_memory_insert`. +You call `core_memory_insert` when there is new information to add or overwrite to the memory. Use the replace flag when you want to perform a targeted edit. +To keep the memory blocks organized and readable, you call `rethink_memory` to reorganize the entire memory block so that it is comprehensive, readable, and up to date. +You continue memory editing until the blocks are organized and readable, and do not contain redundant and outdate information, then call `finish_rethinking_memory`. +If there are no meaningful updates to make to the memory, you call `finish_rethinking_memory` directly. + +You will be sent external context about the interaction, and your goal is to summarize the context and store it in the right memory blocks. diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 539f34c1..c561e2f6 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -284,6 +284,7 @@ def detach_tool( def attach_source( agent_id: str, source_id: str, + background_tasks: BackgroundTasks, server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), ): @@ -291,7 +292,11 @@ def attach_source( Attach a source to an agent. """ actor = server.user_manager.get_user_or_default(user_id=actor_id) - return server.agent_manager.attach_source(agent_id=agent_id, source_id=source_id, actor=actor) + agent = server.agent_manager.attach_source(agent_id=agent_id, source_id=source_id, actor=actor) + if agent.enable_sleeptime: + source = server.source_manager.get_source_by_id(source_id=source_id) + background_tasks.add_task(server.sleeptime_document_ingest, agent, source, actor) + return agent @router.patch("/{agent_id}/sources/detach/{source_id}", response_model=AgentState, operation_id="detach_source_from_agent") @@ -305,7 +310,15 @@ def detach_source( Detach a source from an agent. """ actor = server.user_manager.get_user_or_default(user_id=actor_id) - return server.agent_manager.detach_source(agent_id=agent_id, source_id=source_id, actor=actor) + agent = server.agent_manager.detach_source(agent_id=agent_id, source_id=source_id, actor=actor) + if agent.enable_sleeptime: + try: + source = server.source_manager.get_source_by_id(source_id=source_id) + block = server.agent_manager.get_block_with_label(agent_id=agent.id, block_label=source.name, actor=actor) + server.block_manager.delete_block(block.id, actor) + except: + pass + return agent @router.get("/{agent_id}", response_model=AgentState, operation_id="retrieve_agent") diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index b2b66a2c..5f08b3ea 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -121,7 +121,15 @@ def delete_source( Delete a data source. """ actor = server.user_manager.get_user_or_default(user_id=actor_id) - + source = server.source_manager.get_source_by_id(source_id=source_id) + agents = server.source_manager.list_attached_agents(source_id=source_id, actor=actor) + for agent in agents: + if agent.enable_sleeptime: + try: + block = server.agent_manager.get_block_with_label(agent_id=agent.id, block_label=source.name, actor=actor) + server.block_manager.delete_block(block.id, actor) + except: + pass server.delete_source(source_id=source_id, actor=actor) @@ -151,8 +159,9 @@ def upload_file_to_source( job_id = job.id server.job_manager.create_job(job, actor=actor) - # create background task + # create background tasks background_tasks.add_task(load_file_to_source_async, server, source_id=source.id, file=file, job_id=job.id, bytes=bytes, actor=actor) + background_tasks.add_task(sleeptime_document_ingest_async, server, source_id, actor) # return job information # Is this necessary? Can we just return the job from create_job? @@ -196,6 +205,7 @@ def list_source_files( def delete_file_from_source( source_id: str, file_id: str, + background_tasks: BackgroundTasks, server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present ): @@ -205,6 +215,7 @@ def delete_file_from_source( actor = server.user_manager.get_user_or_default(user_id=actor_id) deleted_file = server.source_manager.delete_file(file_id=file_id, actor=actor) + background_tasks.add_task(sleeptime_document_ingest_async, server, source_id, actor, clear_history=True) if deleted_file is None: raise HTTPException(status_code=404, detail=f"File with id={file_id} not found.") @@ -222,3 +233,11 @@ def load_file_to_source_async(server: SyncServer, source_id: str, job_id: str, f # Pass the file to load_file_to_source server.load_file_to_source(source_id, file_path, job_id, actor) + + +def sleeptime_document_ingest_async(server: SyncServer, source_id: str, actor: User, clear_history: bool = False): + source = server.source_manager.get_source_by_id(source_id=source_id) + agents = server.source_manager.list_attached_agents(source_id=source_id, actor=actor) + for agent in agents: + if agent.enable_sleeptime: + server.sleeptime_document_ingest(agent, source, actor, clear_history) diff --git a/letta/server/server.py b/letta/server/server.py index 274db94f..37d806e0 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -1008,26 +1008,6 @@ class SyncServer(Server): new_passage_size = self.agent_manager.passage_size(actor=actor, agent_id=agent_id) assert new_passage_size >= curr_passage_size # in case empty files are added - # Process file via sleeptime agent - if agent_state.enable_sleeptime: - ephemeral_sleeptime_agent = self.create_document_sleeptime_agent( - main_agent=agent_state, source_name=source.name, actor=actor - ) - - agent = self.load_agent(agent_id=ephemeral_sleeptime_agent.id, actor=actor) - for passage in self.list_data_source_passages(source_id=source_id, user_id=actor.id): - agent.step( - messages=[ - Message( - role="user", - content=[TextContent(text=passage.text)], - agent_id=ephemeral_sleeptime_agent.id, - ), - ] - ) - - self.agent_manager.delete_agent(agent_id=ephemeral_sleeptime_agent.id, actor=actor) - # rebuild system prompt and force agent_state = self.agent_manager.rebuild_system_prompt(agent_id=agent_id, actor=actor, force=True) @@ -1039,12 +1019,33 @@ class SyncServer(Server): return job - def create_document_sleeptime_agent(self, main_agent: AgentState, source_name: str, actor: User) -> AgentState: + def sleeptime_document_ingest(self, main_agent: AgentState, source: Source, actor: User, clear_history: bool = False) -> None: + sleeptime_agent = self.create_document_sleeptime_agent(main_agent, source, actor, clear_history) + agent = self.load_agent(agent_id=sleeptime_agent.id, actor=actor) + for passage in self.list_data_source_passages(source_id=source.id, user_id=actor.id): + agent.step( + messages=[ + Message( + role="user", + content=[TextContent(text=passage.text)], + agent_id=sleeptime_agent.id, + ), + ] + ) + self.agent_manager.delete_agent(agent_id=sleeptime_agent.id, actor=actor) + + def create_document_sleeptime_agent( + self, main_agent: AgentState, source: Source, actor: User, clear_history: bool = False + ) -> AgentState: try: - block = self.agent_manager.get_block_with_label(agent_id=main_agent.id, block_label=source_name, actor=actor) + block = self.agent_manager.get_block_with_label(agent_id=main_agent.id, block_label=source.name, actor=actor) except: - block = self.block_manager.create_or_update_block(Block(label=source_name, value=""), actor=actor) + block = self.block_manager.create_or_update_block(Block(label=source.name, value=""), actor=actor) self.agent_manager.attach_block(agent_id=main_agent.id, block_id=block.id, actor=actor) + + if clear_history and block.value != "": + block = self.block_manager.update_block(block_id=block.id, block=BlockUpdate(value="")) + request = CreateAgent( name=main_agent.name + "-doc-sleeptime", system=get_system_text("sleeptime_doc_ingest"), @@ -1055,6 +1056,10 @@ class SyncServer(Server): label="persona", value=get_persona_text("sleeptime_doc_persona"), ), + CreateBlock( + label="instructions", + value=source.description, + ), ], llm_config=main_agent.llm_config, embedding_config=main_agent.embedding_config,