From 45f09b1dfbfcbd4c817aa421b52d5abf64abadaf Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Mon, 21 Jul 2025 11:27:21 -0700 Subject: [PATCH] feat: Reduce database calls in import/export agent file (#3455) --- letta/services/agent_file_manager.py | 96 ++++++++--- letta/services/tool_manager.py | 50 ++++++ tests/test_managers.py | 249 +++++++++++++++++++++++++++ 3 files changed, 368 insertions(+), 27 deletions(-) diff --git a/letta/services/agent_file_manager.py b/letta/services/agent_file_manager.py index da278912..40aedabb 100644 --- a/letta/services/agent_file_manager.py +++ b/letta/services/agent_file_manager.py @@ -149,7 +149,7 @@ class AgentFileManager: return sorted(unique_blocks.values(), key=lambda x: x.label) async def _extract_unique_sources_and_files_from_agents( - self, agent_states: List[AgentState], actor: User + self, agent_states: List[AgentState], actor: User, files_agents_cache: dict = None ) -> tuple[List[Source], List[FileMetadata]]: """Extract unique sources and files from agent states using bulk operations""" @@ -160,6 +160,10 @@ class AgentFileManager: files_agents = await self.file_agent_manager.list_files_for_agent( agent_id=agent_state.id, actor=actor, is_open_only=False, return_as_blocks=False ) + # cache the results for reuse during conversion + if files_agents_cache is not None: + files_agents_cache[agent_state.id] = files_agents + for file_agent in files_agents: all_source_ids.add(file_agent.source_id) all_file_ids.add(file_agent.file_id) @@ -168,13 +172,18 @@ class AgentFileManager: return sources, files - async def _convert_agent_state_to_schema(self, agent_state: AgentState, actor: User) -> AgentSchema: + async def _convert_agent_state_to_schema(self, agent_state: AgentState, actor: User, files_agents_cache: dict = None) -> AgentSchema: """Convert AgentState to AgentSchema with ID remapping""" agent_file_id = self._map_db_to_file_id(agent_state.id, AgentSchema.__id_prefix__) - files_agents = await self.file_agent_manager.list_files_for_agent( - agent_id=agent_state.id, actor=actor, is_open_only=False, return_as_blocks=False - ) + + # use cached file-agent data if available, otherwise fetch + if files_agents_cache is not None and agent_state.id in files_agents_cache: + files_agents = files_agents_cache[agent_state.id] + else: + files_agents = await self.file_agent_manager.list_files_for_agent( + agent_id=agent_state.id, actor=actor, is_open_only=False, return_as_blocks=False + ) agent_schema = await AgentSchema.from_agent_state( agent_state, message_manager=self.message_manager, files_agents=files_agents, actor=actor ) @@ -264,15 +273,21 @@ class AgentFileManager: missing_ids = [agent_id for agent_id in agent_ids if agent_id not in found_ids] raise AgentFileExportError(f"The following agent IDs were not found: {missing_ids}") + # cache for file-agent relationships to avoid duplicate queries + files_agents_cache = {} # Maps agent_id to list of file_agent relationships + # Extract unique entities across all agents tool_set = self._extract_unique_tools(agent_states) block_set = self._extract_unique_blocks(agent_states) - # Extract sources and files from agent states BEFORE conversion - source_set, file_set = await self._extract_unique_sources_and_files_from_agents(agent_states, actor) + # Extract sources and files from agent states BEFORE conversion (with caching) + source_set, file_set = await self._extract_unique_sources_and_files_from_agents(agent_states, actor, files_agents_cache) - # Convert to schemas with ID remapping - agent_schemas = [await self._convert_agent_state_to_schema(agent_state, actor=actor) for agent_state in agent_states] + # Convert to schemas with ID remapping (reusing cached file-agent data) + agent_schemas = [ + await self._convert_agent_state_to_schema(agent_state, actor=actor, files_agents_cache=files_agents_cache) + for agent_state in agent_states + ] tool_schemas = [self._convert_tool_to_schema(tool) for tool in tool_set] block_schemas = [self._convert_block_to_schema(block) for block in block_set] source_schemas = [self._convert_source_to_schema(source) for source in source_set] @@ -332,23 +347,44 @@ class AgentFileManager: # Import in dependency order imported_count = 0 file_to_db_ids = {} # Maps file IDs to new database IDs + # in-memory cache for file metadata to avoid repeated db calls + file_metadata_cache = {} # Maps database file ID to FileMetadata - # 1. Create tools first (no dependencies) - for tool_schema in schema.tools: - # Convert ToolSchema back to ToolCreate - created_tool = await self.tool_manager.create_or_update_tool_async( - pydantic_tool=Tool(**tool_schema.model_dump(exclude={"id"})), actor=actor - ) - file_to_db_ids[tool_schema.id] = created_tool.id - imported_count += 1 + # 1. Create tools first (no dependencies) - using bulk upsert for efficiency + if schema.tools: + # convert tool schemas to pydantic tools + pydantic_tools = [] + for tool_schema in schema.tools: + pydantic_tools.append(Tool(**tool_schema.model_dump(exclude={"id"}))) - # 2. Create blocks (no dependencies) - for block_schema in schema.blocks: - # Convert BlockSchema back to CreateBlock - block = Block(**block_schema.model_dump(exclude={"id"})) - created_block = await self.block_manager.create_or_update_block_async(block, actor) - file_to_db_ids[block_schema.id] = created_block.id - imported_count += 1 + # bulk upsert all tools at once + created_tools = await self.tool_manager.bulk_upsert_tools_async(pydantic_tools, actor) + + # map file ids to database ids + # note: tools are matched by name during upsert, so we need to match by name here too + created_tools_by_name = {tool.name: tool for tool in created_tools} + for tool_schema in schema.tools: + created_tool = created_tools_by_name.get(tool_schema.name) + if created_tool: + file_to_db_ids[tool_schema.id] = created_tool.id + imported_count += 1 + else: + logger.warning(f"Tool {tool_schema.name} was not created during bulk upsert") + + # 2. Create blocks (no dependencies) - using batch create for efficiency + if schema.blocks: + # convert block schemas to pydantic blocks (excluding IDs to create new blocks) + pydantic_blocks = [] + for block_schema in schema.blocks: + pydantic_blocks.append(Block(**block_schema.model_dump(exclude={"id"}))) + + # batch create all blocks at once + created_blocks = await self.block_manager.batch_create_blocks_async(pydantic_blocks, actor) + + # map file ids to database ids + for block_schema, created_block in zip(schema.blocks, created_blocks): + file_to_db_ids[block_schema.id] = created_block.id + imported_count += 1 # 3. Create sources (no dependencies) for source_schema in schema.sources: @@ -383,8 +419,10 @@ class AgentFileManager: file_db_id = file_to_db_ids[file_schema.id] source_db_id = file_to_db_ids[file_schema.source_id] - # Get the created file metadata - file_metadata = await self.file_manager.get_file_by_id(file_db_id, actor) + # Get the created file metadata (with caching) + if file_db_id not in file_metadata_cache: + file_metadata_cache[file_db_id] = await self.file_manager.get_file_by_id(file_db_id, actor) + file_metadata = file_metadata_cache[file_db_id] # Save the db call of fetching content again file_metadata.content = file_schema.content @@ -447,7 +485,11 @@ class AgentFileManager: for file_agent_schema in agent_schema.files_agents: file_db_id = file_to_db_ids[file_agent_schema.file_id] - file_metadata = await self.file_manager.get_file_by_id(file_db_id, actor) + + # Use cached file metadata if available + if file_db_id not in file_metadata_cache: + file_metadata_cache[file_db_id] = await self.file_manager.get_file_by_id(file_db_id, actor) + file_metadata = file_metadata_cache[file_db_id] files_for_agent.append(file_metadata) if file_agent_schema.visible_content: diff --git a/letta/services/tool_manager.py b/letta/services/tool_manager.py index 13254049..b4af8d4b 100644 --- a/letta/services/tool_manager.py +++ b/letta/services/tool_manager.py @@ -176,6 +176,56 @@ class ToolManager: await tool.create_async(session, actor=actor) # Re-raise other database-related errors return tool.to_pydantic() + @enforce_types + @trace_method + async def bulk_upsert_tools_async(self, pydantic_tools: List[PydanticTool], actor: PydanticUser) -> List[PydanticTool]: + """ + Bulk create or update multiple tools in a single database transaction. + + Uses optimized PostgreSQL bulk upsert when available, falls back to individual + upserts for SQLite. This is much more efficient than calling create_or_update_tool_async + in a loop. + + IMPORTANT BEHAVIOR NOTES: + - Tools are matched by (name, organization_id) unique constraint, NOT by ID + - If a tool with the same name already exists for the organization, it will be updated + regardless of any ID provided in the input tool + - The existing tool's ID is preserved during updates + - If you provide a tool with an explicit ID but a name that matches an existing tool, + the existing tool will be updated and the provided ID will be ignored + - This matches the behavior of create_or_update_tool_async which also matches by name + + PostgreSQL optimization: + - Uses native ON CONFLICT (name, organization_id) DO UPDATE for atomic upserts + - All tools are processed in a single SQL statement for maximum efficiency + + SQLite fallback: + - Falls back to individual create_or_update_tool_async calls + - Still benefits from batched transaction handling + + Args: + pydantic_tools: List of tools to create or update + actor: User performing the action + + Returns: + List of created/updated tools + """ + if not pydantic_tools: + return [] + + # auto-generate descriptions if not provided + for tool in pydantic_tools: + if tool.description is None: + tool.description = tool.json_schema.get("description", None) + + if settings.letta_pg_uri_no_default: + # use optimized postgresql bulk upsert + async with db_registry.async_session() as session: + return await self._bulk_upsert_postgresql(session, pydantic_tools, actor) + else: + # fallback to individual upserts for sqlite + return await self._upsert_tools_individually(pydantic_tools, actor) + @enforce_types @trace_method def get_tool_by_id(self, tool_id: str, actor: PydanticUser) -> PydanticTool: diff --git a/tests/test_managers.py b/tests/test_managers.py index 42560f0e..5caf25e4 100644 --- a/tests/test_managers.py +++ b/tests/test_managers.py @@ -3290,6 +3290,190 @@ async def test_upsert_base_tools_with_empty_type_filter(server: SyncServer, defa assert tools == [] +@pytest.mark.asyncio +async def test_bulk_upsert_tools_async(server: SyncServer, default_user): + """Test bulk upserting multiple tools at once""" + # create multiple test tools + tools_data = [] + for i in range(5): + tool = PydanticTool( + name=f"bulk_test_tool_{i}", + description=f"Test tool {i} for bulk operations", + tags=["bulk", "test"], + source_code=f"def bulk_test_tool_{i}():\n '''Test tool {i} function'''\n return 'result_{i}'", + source_type="python", + ) + tools_data.append(tool) + + # initial bulk upsert - should create all tools + created_tools = await server.tool_manager.bulk_upsert_tools_async(tools_data, default_user) + assert len(created_tools) == 5 + assert all(t.name.startswith("bulk_test_tool_") for t in created_tools) + assert all(t.description for t in created_tools) + + # verify all tools were created + for i in range(5): + tool = await server.tool_manager.get_tool_by_name_async(f"bulk_test_tool_{i}", default_user) + assert tool is not None + assert tool.description == f"Test tool {i} for bulk operations" + + # modify some tools and upsert again - should update existing tools + tools_data[0].description = "Updated description for tool 0" + tools_data[2].tags = ["bulk", "test", "updated"] + + updated_tools = await server.tool_manager.bulk_upsert_tools_async(tools_data, default_user) + assert len(updated_tools) == 5 + + # verify updates were applied + tool_0 = await server.tool_manager.get_tool_by_name_async("bulk_test_tool_0", default_user) + assert tool_0.description == "Updated description for tool 0" + + tool_2 = await server.tool_manager.get_tool_by_name_async("bulk_test_tool_2", default_user) + assert "updated" in tool_2.tags + + # test with empty list + empty_result = await server.tool_manager.bulk_upsert_tools_async([], default_user) + assert empty_result == [] + + # test with tools missing descriptions (should auto-generate from json schema) + no_desc_tool = PydanticTool( + name="no_description_tool", + tags=["test"], + source_code="def no_description_tool():\n '''This is a docstring description'''\n return 'result'", + source_type="python", + ) + result = await server.tool_manager.bulk_upsert_tools_async([no_desc_tool], default_user) + assert len(result) == 1 + assert result[0].description is not None # should be auto-generated from docstring + + +@pytest.mark.asyncio +async def test_bulk_upsert_tools_name_conflict(server: SyncServer, default_user): + """Test bulk upserting tools handles name+org_id unique constraint correctly""" + + # create a tool with a specific name + original_tool = PydanticTool( + name="unique_name_tool", + description="Original description", + tags=["original"], + source_code="def unique_name_tool():\n '''Original function'''\n return 'original'", + source_type="python", + ) + + # create it + created = await server.tool_manager.create_tool_async(original_tool, default_user) + original_id = created.id + + # now try to bulk upsert with same name but different id + conflicting_tool = PydanticTool( + name="unique_name_tool", # same name + description="Updated via bulk upsert", + tags=["updated", "bulk"], + source_code="def unique_name_tool():\n '''Updated function'''\n return 'updated'", + source_type="python", + ) + + # bulk upsert should update the existing tool based on name conflict + result = await server.tool_manager.bulk_upsert_tools_async([conflicting_tool], default_user) + assert len(result) == 1 + assert result[0].name == "unique_name_tool" + assert result[0].description == "Updated via bulk upsert" + assert "updated" in result[0].tags + assert "bulk" in result[0].tags + + # verify only one tool exists with this name + all_tools = await server.tool_manager.list_tools_async(actor=default_user) + tools_with_name = [t for t in all_tools if t.name == "unique_name_tool"] + assert len(tools_with_name) == 1 + + # the id should remain the same as the original + assert tools_with_name[0].id == original_id + + +@pytest.mark.asyncio +async def test_bulk_upsert_tools_mixed_create_update(server: SyncServer, default_user): + """Test bulk upserting with mix of new tools and updates to existing ones""" + + # create some existing tools + existing_tools = [] + for i in range(3): + tool = PydanticTool( + name=f"existing_tool_{i}", + description=f"Existing tool {i}", + tags=["existing"], + source_code=f"def existing_tool_{i}():\n '''Existing {i}'''\n return 'existing_{i}'", + source_type="python", + ) + created = await server.tool_manager.create_tool_async(tool, default_user) + existing_tools.append(created) + + # prepare bulk upsert with mix of updates and new tools + bulk_tools = [] + + # update existing tool 0 by name + bulk_tools.append( + PydanticTool( + name="existing_tool_0", # matches by name + description="Updated existing tool 0", + tags=["existing", "updated"], + source_code="def existing_tool_0():\n '''Updated 0'''\n return 'updated_0'", + source_type="python", + ) + ) + + # update existing tool 1 by name (since bulk upsert matches by name, not id) + bulk_tools.append( + PydanticTool( + name="existing_tool_1", # matches by name + description="Updated existing tool 1", + tags=["existing", "updated"], + source_code="def existing_tool_1():\n '''Updated 1'''\n return 'updated_1'", + source_type="python", + ) + ) + + # add completely new tools + for i in range(3, 6): + bulk_tools.append( + PydanticTool( + name=f"new_tool_{i}", + description=f"New tool {i}", + tags=["new"], + source_code=f"def new_tool_{i}():\n '''New {i}'''\n return 'new_{i}'", + source_type="python", + ) + ) + + # perform bulk upsert + result = await server.tool_manager.bulk_upsert_tools_async(bulk_tools, default_user) + assert len(result) == 5 # 2 updates + 3 new + + # verify updates + tool_0 = await server.tool_manager.get_tool_by_name_async("existing_tool_0", default_user) + assert tool_0.description == "Updated existing tool 0" + assert "updated" in tool_0.tags + assert tool_0.id == existing_tools[0].id # id should remain same + + # verify tool 1 was updated + tool_1 = await server.tool_manager.get_tool_by_id_async(existing_tools[1].id, default_user) + assert tool_1.name == "existing_tool_1" # name stays same + assert tool_1.description == "Updated existing tool 1" + assert "updated" in tool_1.tags + + # verify new tools were created + for i in range(3, 6): + new_tool = await server.tool_manager.get_tool_by_name_async(f"new_tool_{i}", default_user) + assert new_tool is not None + assert new_tool.description == f"New tool {i}" + assert "new" in new_tool.tags + + # verify existing_tool_2 was not affected + tool_2 = await server.tool_manager.get_tool_by_id_async(existing_tools[2].id, default_user) + assert tool_2.name == "existing_tool_2" + assert tool_2.description == "Existing tool 2" + assert tool_2.tags == ["existing"] + + @pytest.mark.asyncio async def test_create_tool_with_pip_requirements(server: SyncServer, default_user, default_organization): def test_tool_with_deps(): @@ -3667,6 +3851,71 @@ def test_create_block(server: SyncServer, default_user): assert block.metadata == block_create.metadata +@pytest.mark.asyncio +async def test_batch_create_blocks_async(server: SyncServer, default_user): + """Test batch creating multiple blocks at once""" + block_manager = BlockManager() + + # create multiple test blocks + blocks_data = [] + for i in range(5): + block = PydanticBlock( + label=f"test_block_{i}", + is_template=False, + value=f"Content for block {i}", + description=f"Test block {i} for batch operations", + limit=1000 + i * 100, # varying limits + metadata={"index": i, "batch": "test"}, + ) + blocks_data.append(block) + + # batch create all blocks at once + created_blocks = await block_manager.batch_create_blocks_async(blocks_data, default_user) + + # verify all blocks were created + assert len(created_blocks) == 5 + assert all(b.label.startswith("test_block_") for b in created_blocks) + + # verify block properties were preserved + for i, block in enumerate(created_blocks): + assert block.label == f"test_block_{i}" + assert block.value == f"Content for block {i}" + assert block.description == f"Test block {i} for batch operations" + assert block.limit == 1000 + i * 100 + assert block.metadata["index"] == i + assert block.metadata["batch"] == "test" + assert block.id is not None # should have generated ids + # blocks have organization_id at the orm level, not in the pydantic model + + # verify blocks can be retrieved individually + for created_block in created_blocks: + retrieved = await block_manager.get_block_by_id_async(created_block.id, default_user) + assert retrieved.id == created_block.id + assert retrieved.label == created_block.label + assert retrieved.value == created_block.value + + # test with empty list + empty_result = await block_manager.batch_create_blocks_async([], default_user) + assert empty_result == [] + + # test creating blocks with same labels (should create separate blocks since no unique constraint) + duplicate_blocks = [ + PydanticBlock(label="duplicate_label", value="Block 1"), + PydanticBlock(label="duplicate_label", value="Block 2"), + PydanticBlock(label="duplicate_label", value="Block 3"), + ] + + created_duplicates = await block_manager.batch_create_blocks_async(duplicate_blocks, default_user) + assert len(created_duplicates) == 3 + assert all(b.label == "duplicate_label" for b in created_duplicates) + # all should have different ids + ids = [b.id for b in created_duplicates] + assert len(set(ids)) == 3 # all unique ids + # but different values + values = [b.value for b in created_duplicates] + assert set(values) == {"Block 1", "Block 2", "Block 3"} + + @pytest.mark.asyncio async def test_get_blocks(server, default_user, event_loop): block_manager = BlockManager()