From 74624e4499398e32e5038d18979b4b29772fcea0 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Sun, 3 Aug 2025 01:58:31 -0700 Subject: [PATCH] fix: Do the CTE in postgres via agent id join (#3717) --- ...4e860718e0d_add_archival_memory_sharing.py | 104 ++++++++++-------- 1 file changed, 59 insertions(+), 45 deletions(-) diff --git a/alembic/versions/74e860718e0d_add_archival_memory_sharing.py b/alembic/versions/74e860718e0d_add_archival_memory_sharing.py index 61c41631..727cb059 100644 --- a/alembic/versions/74e860718e0d_add_archival_memory_sharing.py +++ b/alembic/versions/74e860718e0d_add_archival_memory_sharing.py @@ -248,53 +248,67 @@ def upgrade() -> None: batch_size = 1000 processed_agents = 0 - # process agents in batches - while processed_agents < total_agents: - print(f"Processing batch {processed_agents + 1} to {min(processed_agents + batch_size, total_agents)} of {total_agents}...") - - # create archives for this batch of agents - op.execute( - sa.text( - """ - WITH batch_agents AS ( - SELECT DISTINCT a.id, a.name, a.organization_id - FROM agent_passages ap - JOIN agents a ON ap.agent_id = a.id - WHERE ap.is_deleted = FALSE - AND NOT EXISTS ( - SELECT 1 FROM archives_agents aa - WHERE aa.agent_id = a.id - ) - LIMIT :batch_size - ), - inserted_archives AS ( - INSERT INTO archives (id, name, description, organization_id, created_at) - SELECT - 'archive-' || gen_random_uuid(), - COALESCE(ba.name, 'Agent ' || ba.id) || '''s Archive', - 'Default archive created during migration', - ba.organization_id, - NOW() - FROM batch_agents ba - RETURNING id as archive_id, - organization_id, - SUBSTRING(name FROM 1 FOR LENGTH(name) - LENGTH('''s Archive')) as agent_name + # process agents one by one to maintain proper relationships + offset = 0 + while offset < total_agents: + # Get batch of agents that need archives + batch_result = connection.execute( + sa.text(""" + SELECT DISTINCT a.id, a.name, a.organization_id + FROM agent_passages ap + JOIN agents a ON ap.agent_id = a.id + WHERE ap.is_deleted = FALSE + AND NOT EXISTS ( + SELECT 1 FROM archives_agents aa + WHERE aa.agent_id = a.id ) - -- create archives_agents relationships - INSERT INTO archives_agents (agent_id, archive_id, is_owner, created_at) - SELECT - a.id as agent_id, - ia.archive_id, - TRUE, - NOW() - FROM inserted_archives ia - JOIN agents a ON a.organization_id = ia.organization_id - AND (a.name = ia.agent_name OR ('Agent ' || a.id) = ia.agent_name) - """ - ).bindparams(batch_size=batch_size) + ORDER BY a.id + LIMIT :batch_size + """).bindparams(batch_size=batch_size) ) - - processed_agents += batch_size + + agents_batch = batch_result.fetchall() + if not agents_batch: + break # No more agents to process + + batch_count = len(agents_batch) + print(f"Processing batch of {batch_count} agents (offset: {offset})...") + + # Create archive and relationship for each agent + for agent_id, agent_name, org_id in agents_batch: + try: + # Create archive + archive_result = connection.execute( + sa.text(""" + INSERT INTO archives (id, name, description, organization_id, created_at) + VALUES ( + 'archive-' || gen_random_uuid(), + :archive_name, + 'Default archive created during migration', + :org_id, + NOW() + ) + RETURNING id + """).bindparams( + archive_name=f"{agent_name or f'Agent {agent_id}'}'s Archive", + org_id=org_id + ) + ) + archive_id = archive_result.scalar() + + # Create agent-archive relationship + connection.execute( + sa.text(""" + INSERT INTO archives_agents (agent_id, archive_id, is_owner, created_at) + VALUES (:agent_id, :archive_id, TRUE, NOW()) + """).bindparams(agent_id=agent_id, archive_id=archive_id) + ) + except Exception as e: + print(f"Warning: Failed to create archive for agent {agent_id}: {e}") + # Continue with other agents + + offset += batch_count + processed_agents = offset print("Archive creation completed. Starting archive_id updates...")