feat: Reduce database calls in import/export agent file (#3455)

This commit is contained in:
Matthew Zhou
2025-07-21 11:27:21 -07:00
committed by GitHub
parent e81cf011e0
commit 45f09b1dfb
3 changed files with 368 additions and 27 deletions

View File

@@ -149,7 +149,7 @@ class AgentFileManager:
return sorted(unique_blocks.values(), key=lambda x: x.label)
async def _extract_unique_sources_and_files_from_agents(
self, agent_states: List[AgentState], actor: User
self, agent_states: List[AgentState], actor: User, files_agents_cache: dict = None
) -> tuple[List[Source], List[FileMetadata]]:
"""Extract unique sources and files from agent states using bulk operations"""
@@ -160,6 +160,10 @@ class AgentFileManager:
files_agents = await self.file_agent_manager.list_files_for_agent(
agent_id=agent_state.id, actor=actor, is_open_only=False, return_as_blocks=False
)
# cache the results for reuse during conversion
if files_agents_cache is not None:
files_agents_cache[agent_state.id] = files_agents
for file_agent in files_agents:
all_source_ids.add(file_agent.source_id)
all_file_ids.add(file_agent.file_id)
@@ -168,13 +172,18 @@ class AgentFileManager:
return sources, files
async def _convert_agent_state_to_schema(self, agent_state: AgentState, actor: User) -> AgentSchema:
async def _convert_agent_state_to_schema(self, agent_state: AgentState, actor: User, files_agents_cache: dict = None) -> AgentSchema:
"""Convert AgentState to AgentSchema with ID remapping"""
agent_file_id = self._map_db_to_file_id(agent_state.id, AgentSchema.__id_prefix__)
files_agents = await self.file_agent_manager.list_files_for_agent(
agent_id=agent_state.id, actor=actor, is_open_only=False, return_as_blocks=False
)
# use cached file-agent data if available, otherwise fetch
if files_agents_cache is not None and agent_state.id in files_agents_cache:
files_agents = files_agents_cache[agent_state.id]
else:
files_agents = await self.file_agent_manager.list_files_for_agent(
agent_id=agent_state.id, actor=actor, is_open_only=False, return_as_blocks=False
)
agent_schema = await AgentSchema.from_agent_state(
agent_state, message_manager=self.message_manager, files_agents=files_agents, actor=actor
)
@@ -264,15 +273,21 @@ class AgentFileManager:
missing_ids = [agent_id for agent_id in agent_ids if agent_id not in found_ids]
raise AgentFileExportError(f"The following agent IDs were not found: {missing_ids}")
# cache for file-agent relationships to avoid duplicate queries
files_agents_cache = {} # Maps agent_id to list of file_agent relationships
# Extract unique entities across all agents
tool_set = self._extract_unique_tools(agent_states)
block_set = self._extract_unique_blocks(agent_states)
# Extract sources and files from agent states BEFORE conversion
source_set, file_set = await self._extract_unique_sources_and_files_from_agents(agent_states, actor)
# Extract sources and files from agent states BEFORE conversion (with caching)
source_set, file_set = await self._extract_unique_sources_and_files_from_agents(agent_states, actor, files_agents_cache)
# Convert to schemas with ID remapping
agent_schemas = [await self._convert_agent_state_to_schema(agent_state, actor=actor) for agent_state in agent_states]
# Convert to schemas with ID remapping (reusing cached file-agent data)
agent_schemas = [
await self._convert_agent_state_to_schema(agent_state, actor=actor, files_agents_cache=files_agents_cache)
for agent_state in agent_states
]
tool_schemas = [self._convert_tool_to_schema(tool) for tool in tool_set]
block_schemas = [self._convert_block_to_schema(block) for block in block_set]
source_schemas = [self._convert_source_to_schema(source) for source in source_set]
@@ -332,23 +347,44 @@ class AgentFileManager:
# Import in dependency order
imported_count = 0
file_to_db_ids = {} # Maps file IDs to new database IDs
# in-memory cache for file metadata to avoid repeated db calls
file_metadata_cache = {} # Maps database file ID to FileMetadata
# 1. Create tools first (no dependencies)
for tool_schema in schema.tools:
# Convert ToolSchema back to ToolCreate
created_tool = await self.tool_manager.create_or_update_tool_async(
pydantic_tool=Tool(**tool_schema.model_dump(exclude={"id"})), actor=actor
)
file_to_db_ids[tool_schema.id] = created_tool.id
imported_count += 1
# 1. Create tools first (no dependencies) - using bulk upsert for efficiency
if schema.tools:
# convert tool schemas to pydantic tools
pydantic_tools = []
for tool_schema in schema.tools:
pydantic_tools.append(Tool(**tool_schema.model_dump(exclude={"id"})))
# 2. Create blocks (no dependencies)
for block_schema in schema.blocks:
# Convert BlockSchema back to CreateBlock
block = Block(**block_schema.model_dump(exclude={"id"}))
created_block = await self.block_manager.create_or_update_block_async(block, actor)
file_to_db_ids[block_schema.id] = created_block.id
imported_count += 1
# bulk upsert all tools at once
created_tools = await self.tool_manager.bulk_upsert_tools_async(pydantic_tools, actor)
# map file ids to database ids
# note: tools are matched by name during upsert, so we need to match by name here too
created_tools_by_name = {tool.name: tool for tool in created_tools}
for tool_schema in schema.tools:
created_tool = created_tools_by_name.get(tool_schema.name)
if created_tool:
file_to_db_ids[tool_schema.id] = created_tool.id
imported_count += 1
else:
logger.warning(f"Tool {tool_schema.name} was not created during bulk upsert")
# 2. Create blocks (no dependencies) - using batch create for efficiency
if schema.blocks:
# convert block schemas to pydantic blocks (excluding IDs to create new blocks)
pydantic_blocks = []
for block_schema in schema.blocks:
pydantic_blocks.append(Block(**block_schema.model_dump(exclude={"id"})))
# batch create all blocks at once
created_blocks = await self.block_manager.batch_create_blocks_async(pydantic_blocks, actor)
# map file ids to database ids
for block_schema, created_block in zip(schema.blocks, created_blocks):
file_to_db_ids[block_schema.id] = created_block.id
imported_count += 1
# 3. Create sources (no dependencies)
for source_schema in schema.sources:
@@ -383,8 +419,10 @@ class AgentFileManager:
file_db_id = file_to_db_ids[file_schema.id]
source_db_id = file_to_db_ids[file_schema.source_id]
# Get the created file metadata
file_metadata = await self.file_manager.get_file_by_id(file_db_id, actor)
# Get the created file metadata (with caching)
if file_db_id not in file_metadata_cache:
file_metadata_cache[file_db_id] = await self.file_manager.get_file_by_id(file_db_id, actor)
file_metadata = file_metadata_cache[file_db_id]
# Save the db call of fetching content again
file_metadata.content = file_schema.content
@@ -447,7 +485,11 @@ class AgentFileManager:
for file_agent_schema in agent_schema.files_agents:
file_db_id = file_to_db_ids[file_agent_schema.file_id]
file_metadata = await self.file_manager.get_file_by_id(file_db_id, actor)
# Use cached file metadata if available
if file_db_id not in file_metadata_cache:
file_metadata_cache[file_db_id] = await self.file_manager.get_file_by_id(file_db_id, actor)
file_metadata = file_metadata_cache[file_db_id]
files_for_agent.append(file_metadata)
if file_agent_schema.visible_content:

View File

@@ -176,6 +176,56 @@ class ToolManager:
await tool.create_async(session, actor=actor) # Re-raise other database-related errors
return tool.to_pydantic()
@enforce_types
@trace_method
async def bulk_upsert_tools_async(self, pydantic_tools: List[PydanticTool], actor: PydanticUser) -> List[PydanticTool]:
"""
Bulk create or update multiple tools in a single database transaction.
Uses optimized PostgreSQL bulk upsert when available, falls back to individual
upserts for SQLite. This is much more efficient than calling create_or_update_tool_async
in a loop.
IMPORTANT BEHAVIOR NOTES:
- Tools are matched by (name, organization_id) unique constraint, NOT by ID
- If a tool with the same name already exists for the organization, it will be updated
regardless of any ID provided in the input tool
- The existing tool's ID is preserved during updates
- If you provide a tool with an explicit ID but a name that matches an existing tool,
the existing tool will be updated and the provided ID will be ignored
- This matches the behavior of create_or_update_tool_async which also matches by name
PostgreSQL optimization:
- Uses native ON CONFLICT (name, organization_id) DO UPDATE for atomic upserts
- All tools are processed in a single SQL statement for maximum efficiency
SQLite fallback:
- Falls back to individual create_or_update_tool_async calls
- Still benefits from batched transaction handling
Args:
pydantic_tools: List of tools to create or update
actor: User performing the action
Returns:
List of created/updated tools
"""
if not pydantic_tools:
return []
# auto-generate descriptions if not provided
for tool in pydantic_tools:
if tool.description is None:
tool.description = tool.json_schema.get("description", None)
if settings.letta_pg_uri_no_default:
# use optimized postgresql bulk upsert
async with db_registry.async_session() as session:
return await self._bulk_upsert_postgresql(session, pydantic_tools, actor)
else:
# fallback to individual upserts for sqlite
return await self._upsert_tools_individually(pydantic_tools, actor)
@enforce_types
@trace_method
def get_tool_by_id(self, tool_id: str, actor: PydanticUser) -> PydanticTool:

View File

@@ -3290,6 +3290,190 @@ async def test_upsert_base_tools_with_empty_type_filter(server: SyncServer, defa
assert tools == []
@pytest.mark.asyncio
async def test_bulk_upsert_tools_async(server: SyncServer, default_user):
"""Test bulk upserting multiple tools at once"""
# create multiple test tools
tools_data = []
for i in range(5):
tool = PydanticTool(
name=f"bulk_test_tool_{i}",
description=f"Test tool {i} for bulk operations",
tags=["bulk", "test"],
source_code=f"def bulk_test_tool_{i}():\n '''Test tool {i} function'''\n return 'result_{i}'",
source_type="python",
)
tools_data.append(tool)
# initial bulk upsert - should create all tools
created_tools = await server.tool_manager.bulk_upsert_tools_async(tools_data, default_user)
assert len(created_tools) == 5
assert all(t.name.startswith("bulk_test_tool_") for t in created_tools)
assert all(t.description for t in created_tools)
# verify all tools were created
for i in range(5):
tool = await server.tool_manager.get_tool_by_name_async(f"bulk_test_tool_{i}", default_user)
assert tool is not None
assert tool.description == f"Test tool {i} for bulk operations"
# modify some tools and upsert again - should update existing tools
tools_data[0].description = "Updated description for tool 0"
tools_data[2].tags = ["bulk", "test", "updated"]
updated_tools = await server.tool_manager.bulk_upsert_tools_async(tools_data, default_user)
assert len(updated_tools) == 5
# verify updates were applied
tool_0 = await server.tool_manager.get_tool_by_name_async("bulk_test_tool_0", default_user)
assert tool_0.description == "Updated description for tool 0"
tool_2 = await server.tool_manager.get_tool_by_name_async("bulk_test_tool_2", default_user)
assert "updated" in tool_2.tags
# test with empty list
empty_result = await server.tool_manager.bulk_upsert_tools_async([], default_user)
assert empty_result == []
# test with tools missing descriptions (should auto-generate from json schema)
no_desc_tool = PydanticTool(
name="no_description_tool",
tags=["test"],
source_code="def no_description_tool():\n '''This is a docstring description'''\n return 'result'",
source_type="python",
)
result = await server.tool_manager.bulk_upsert_tools_async([no_desc_tool], default_user)
assert len(result) == 1
assert result[0].description is not None # should be auto-generated from docstring
@pytest.mark.asyncio
async def test_bulk_upsert_tools_name_conflict(server: SyncServer, default_user):
"""Test bulk upserting tools handles name+org_id unique constraint correctly"""
# create a tool with a specific name
original_tool = PydanticTool(
name="unique_name_tool",
description="Original description",
tags=["original"],
source_code="def unique_name_tool():\n '''Original function'''\n return 'original'",
source_type="python",
)
# create it
created = await server.tool_manager.create_tool_async(original_tool, default_user)
original_id = created.id
# now try to bulk upsert with same name but different id
conflicting_tool = PydanticTool(
name="unique_name_tool", # same name
description="Updated via bulk upsert",
tags=["updated", "bulk"],
source_code="def unique_name_tool():\n '''Updated function'''\n return 'updated'",
source_type="python",
)
# bulk upsert should update the existing tool based on name conflict
result = await server.tool_manager.bulk_upsert_tools_async([conflicting_tool], default_user)
assert len(result) == 1
assert result[0].name == "unique_name_tool"
assert result[0].description == "Updated via bulk upsert"
assert "updated" in result[0].tags
assert "bulk" in result[0].tags
# verify only one tool exists with this name
all_tools = await server.tool_manager.list_tools_async(actor=default_user)
tools_with_name = [t for t in all_tools if t.name == "unique_name_tool"]
assert len(tools_with_name) == 1
# the id should remain the same as the original
assert tools_with_name[0].id == original_id
@pytest.mark.asyncio
async def test_bulk_upsert_tools_mixed_create_update(server: SyncServer, default_user):
"""Test bulk upserting with mix of new tools and updates to existing ones"""
# create some existing tools
existing_tools = []
for i in range(3):
tool = PydanticTool(
name=f"existing_tool_{i}",
description=f"Existing tool {i}",
tags=["existing"],
source_code=f"def existing_tool_{i}():\n '''Existing {i}'''\n return 'existing_{i}'",
source_type="python",
)
created = await server.tool_manager.create_tool_async(tool, default_user)
existing_tools.append(created)
# prepare bulk upsert with mix of updates and new tools
bulk_tools = []
# update existing tool 0 by name
bulk_tools.append(
PydanticTool(
name="existing_tool_0", # matches by name
description="Updated existing tool 0",
tags=["existing", "updated"],
source_code="def existing_tool_0():\n '''Updated 0'''\n return 'updated_0'",
source_type="python",
)
)
# update existing tool 1 by name (since bulk upsert matches by name, not id)
bulk_tools.append(
PydanticTool(
name="existing_tool_1", # matches by name
description="Updated existing tool 1",
tags=["existing", "updated"],
source_code="def existing_tool_1():\n '''Updated 1'''\n return 'updated_1'",
source_type="python",
)
)
# add completely new tools
for i in range(3, 6):
bulk_tools.append(
PydanticTool(
name=f"new_tool_{i}",
description=f"New tool {i}",
tags=["new"],
source_code=f"def new_tool_{i}():\n '''New {i}'''\n return 'new_{i}'",
source_type="python",
)
)
# perform bulk upsert
result = await server.tool_manager.bulk_upsert_tools_async(bulk_tools, default_user)
assert len(result) == 5 # 2 updates + 3 new
# verify updates
tool_0 = await server.tool_manager.get_tool_by_name_async("existing_tool_0", default_user)
assert tool_0.description == "Updated existing tool 0"
assert "updated" in tool_0.tags
assert tool_0.id == existing_tools[0].id # id should remain same
# verify tool 1 was updated
tool_1 = await server.tool_manager.get_tool_by_id_async(existing_tools[1].id, default_user)
assert tool_1.name == "existing_tool_1" # name stays same
assert tool_1.description == "Updated existing tool 1"
assert "updated" in tool_1.tags
# verify new tools were created
for i in range(3, 6):
new_tool = await server.tool_manager.get_tool_by_name_async(f"new_tool_{i}", default_user)
assert new_tool is not None
assert new_tool.description == f"New tool {i}"
assert "new" in new_tool.tags
# verify existing_tool_2 was not affected
tool_2 = await server.tool_manager.get_tool_by_id_async(existing_tools[2].id, default_user)
assert tool_2.name == "existing_tool_2"
assert tool_2.description == "Existing tool 2"
assert tool_2.tags == ["existing"]
@pytest.mark.asyncio
async def test_create_tool_with_pip_requirements(server: SyncServer, default_user, default_organization):
def test_tool_with_deps():
@@ -3667,6 +3851,71 @@ def test_create_block(server: SyncServer, default_user):
assert block.metadata == block_create.metadata
@pytest.mark.asyncio
async def test_batch_create_blocks_async(server: SyncServer, default_user):
"""Test batch creating multiple blocks at once"""
block_manager = BlockManager()
# create multiple test blocks
blocks_data = []
for i in range(5):
block = PydanticBlock(
label=f"test_block_{i}",
is_template=False,
value=f"Content for block {i}",
description=f"Test block {i} for batch operations",
limit=1000 + i * 100, # varying limits
metadata={"index": i, "batch": "test"},
)
blocks_data.append(block)
# batch create all blocks at once
created_blocks = await block_manager.batch_create_blocks_async(blocks_data, default_user)
# verify all blocks were created
assert len(created_blocks) == 5
assert all(b.label.startswith("test_block_") for b in created_blocks)
# verify block properties were preserved
for i, block in enumerate(created_blocks):
assert block.label == f"test_block_{i}"
assert block.value == f"Content for block {i}"
assert block.description == f"Test block {i} for batch operations"
assert block.limit == 1000 + i * 100
assert block.metadata["index"] == i
assert block.metadata["batch"] == "test"
assert block.id is not None # should have generated ids
# blocks have organization_id at the orm level, not in the pydantic model
# verify blocks can be retrieved individually
for created_block in created_blocks:
retrieved = await block_manager.get_block_by_id_async(created_block.id, default_user)
assert retrieved.id == created_block.id
assert retrieved.label == created_block.label
assert retrieved.value == created_block.value
# test with empty list
empty_result = await block_manager.batch_create_blocks_async([], default_user)
assert empty_result == []
# test creating blocks with same labels (should create separate blocks since no unique constraint)
duplicate_blocks = [
PydanticBlock(label="duplicate_label", value="Block 1"),
PydanticBlock(label="duplicate_label", value="Block 2"),
PydanticBlock(label="duplicate_label", value="Block 3"),
]
created_duplicates = await block_manager.batch_create_blocks_async(duplicate_blocks, default_user)
assert len(created_duplicates) == 3
assert all(b.label == "duplicate_label" for b in created_duplicates)
# all should have different ids
ids = [b.id for b in created_duplicates]
assert len(set(ids)) == 3 # all unique ids
# but different values
values = [b.value for b in created_duplicates]
assert set(values) == {"Block 1", "Block 2", "Block 3"}
@pytest.mark.asyncio
async def test_get_blocks(server, default_user, event_loop):
block_manager = BlockManager()