fix: Do the CTE in postgres via agent id join (#3717)
This commit is contained in:
@@ -248,53 +248,67 @@ def upgrade() -> None:
|
||||
batch_size = 1000
|
||||
processed_agents = 0
|
||||
|
||||
# process agents in batches
|
||||
while processed_agents < total_agents:
|
||||
print(f"Processing batch {processed_agents + 1} to {min(processed_agents + batch_size, total_agents)} of {total_agents}...")
|
||||
|
||||
# create archives for this batch of agents
|
||||
op.execute(
|
||||
sa.text(
|
||||
"""
|
||||
WITH batch_agents AS (
|
||||
SELECT DISTINCT a.id, a.name, a.organization_id
|
||||
FROM agent_passages ap
|
||||
JOIN agents a ON ap.agent_id = a.id
|
||||
WHERE ap.is_deleted = FALSE
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM archives_agents aa
|
||||
WHERE aa.agent_id = a.id
|
||||
)
|
||||
LIMIT :batch_size
|
||||
),
|
||||
inserted_archives AS (
|
||||
INSERT INTO archives (id, name, description, organization_id, created_at)
|
||||
SELECT
|
||||
'archive-' || gen_random_uuid(),
|
||||
COALESCE(ba.name, 'Agent ' || ba.id) || '''s Archive',
|
||||
'Default archive created during migration',
|
||||
ba.organization_id,
|
||||
NOW()
|
||||
FROM batch_agents ba
|
||||
RETURNING id as archive_id,
|
||||
organization_id,
|
||||
SUBSTRING(name FROM 1 FOR LENGTH(name) - LENGTH('''s Archive')) as agent_name
|
||||
# process agents one by one to maintain proper relationships
|
||||
offset = 0
|
||||
while offset < total_agents:
|
||||
# Get batch of agents that need archives
|
||||
batch_result = connection.execute(
|
||||
sa.text("""
|
||||
SELECT DISTINCT a.id, a.name, a.organization_id
|
||||
FROM agent_passages ap
|
||||
JOIN agents a ON ap.agent_id = a.id
|
||||
WHERE ap.is_deleted = FALSE
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM archives_agents aa
|
||||
WHERE aa.agent_id = a.id
|
||||
)
|
||||
-- create archives_agents relationships
|
||||
INSERT INTO archives_agents (agent_id, archive_id, is_owner, created_at)
|
||||
SELECT
|
||||
a.id as agent_id,
|
||||
ia.archive_id,
|
||||
TRUE,
|
||||
NOW()
|
||||
FROM inserted_archives ia
|
||||
JOIN agents a ON a.organization_id = ia.organization_id
|
||||
AND (a.name = ia.agent_name OR ('Agent ' || a.id) = ia.agent_name)
|
||||
"""
|
||||
).bindparams(batch_size=batch_size)
|
||||
ORDER BY a.id
|
||||
LIMIT :batch_size
|
||||
""").bindparams(batch_size=batch_size)
|
||||
)
|
||||
|
||||
processed_agents += batch_size
|
||||
|
||||
agents_batch = batch_result.fetchall()
|
||||
if not agents_batch:
|
||||
break # No more agents to process
|
||||
|
||||
batch_count = len(agents_batch)
|
||||
print(f"Processing batch of {batch_count} agents (offset: {offset})...")
|
||||
|
||||
# Create archive and relationship for each agent
|
||||
for agent_id, agent_name, org_id in agents_batch:
|
||||
try:
|
||||
# Create archive
|
||||
archive_result = connection.execute(
|
||||
sa.text("""
|
||||
INSERT INTO archives (id, name, description, organization_id, created_at)
|
||||
VALUES (
|
||||
'archive-' || gen_random_uuid(),
|
||||
:archive_name,
|
||||
'Default archive created during migration',
|
||||
:org_id,
|
||||
NOW()
|
||||
)
|
||||
RETURNING id
|
||||
""").bindparams(
|
||||
archive_name=f"{agent_name or f'Agent {agent_id}'}'s Archive",
|
||||
org_id=org_id
|
||||
)
|
||||
)
|
||||
archive_id = archive_result.scalar()
|
||||
|
||||
# Create agent-archive relationship
|
||||
connection.execute(
|
||||
sa.text("""
|
||||
INSERT INTO archives_agents (agent_id, archive_id, is_owner, created_at)
|
||||
VALUES (:agent_id, :archive_id, TRUE, NOW())
|
||||
""").bindparams(agent_id=agent_id, archive_id=archive_id)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to create archive for agent {agent_id}: {e}")
|
||||
# Continue with other agents
|
||||
|
||||
offset += batch_count
|
||||
processed_agents = offset
|
||||
|
||||
print("Archive creation completed. Starting archive_id updates...")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user