fix(core): git HTTP org fallback + post-push sync (#9297)
This commit is contained in:
committed by
Caren Thomas
parent
530d33c254
commit
597c7f3e3e
@@ -75,6 +75,26 @@ _server_instance = None
|
||||
_repo_cache: Dict[str, str] = {}
|
||||
_repo_locks: Dict[str, threading.Lock] = {}
|
||||
|
||||
|
||||
def _dulwich_repo_path_marker_file(cache_key: str) -> str:
|
||||
"""Path to a marker file that stores the dulwich temp repo path.
|
||||
|
||||
Dulwich runs in-process and mutates a repo materialized into a temp directory.
|
||||
We then need to locate that same temp directory after the push to persist the
|
||||
updated `.git/` contents back to object storage.
|
||||
|
||||
In production we may have multiple FastAPI workers; in-memory `_repo_cache`
|
||||
is not shared across workers, so we store the repo_path in a small file under
|
||||
/tmp as a best-effort handoff. (Longer-term, we'll likely move dulwich to its
|
||||
own service/process and remove this.)
|
||||
"""
|
||||
|
||||
safe = cache_key.replace("/", "__")
|
||||
base = os.path.join(tempfile.gettempdir(), "letta-git-http")
|
||||
os.makedirs(base, exist_ok=True)
|
||||
return os.path.join(base, f"dulwich_repo_path__{safe}.txt")
|
||||
|
||||
|
||||
# org_id for the currently-handled dulwich request (set by a WSGI wrapper).
|
||||
_current_org_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar("letta_git_http_org_id", default=None)
|
||||
|
||||
@@ -235,10 +255,21 @@ class GCSBackend(Backend):
|
||||
old_repo_path = _repo_cache.pop(cache_key, None)
|
||||
if old_repo_path:
|
||||
shutil.rmtree(os.path.dirname(old_repo_path), ignore_errors=True)
|
||||
try:
|
||||
os.unlink(_dulwich_repo_path_marker_file(cache_key))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
repo_path = self._download_repo_sync(agent_id=agent_id, org_id=org_id)
|
||||
_repo_cache[cache_key] = repo_path
|
||||
|
||||
# Persist repo_path for cross-worker post-push sync.
|
||||
try:
|
||||
with open(_dulwich_repo_path_marker_file(cache_key), "w") as f:
|
||||
f.write(repo_path)
|
||||
except Exception:
|
||||
logger.exception("Failed to write repo_path marker for %s", cache_key)
|
||||
|
||||
repo = Repo(repo_path)
|
||||
_prune_broken_refs(repo)
|
||||
return repo
|
||||
@@ -362,12 +393,31 @@ def _prune_broken_refs(repo: Repo) -> int:
|
||||
return removed
|
||||
|
||||
|
||||
async def _sync_after_push(org_id: str, agent_id: str) -> None:
|
||||
async def _sync_after_push(actor_id: str, agent_id: str) -> None:
|
||||
"""Sync repo back to GCS and PostgreSQL after a successful push."""
|
||||
|
||||
if _server_instance is None:
|
||||
logger.warning("Server instance not set; cannot sync after push")
|
||||
return
|
||||
|
||||
try:
|
||||
actor = await _server_instance.user_manager.get_actor_by_id_async(actor_id)
|
||||
except Exception:
|
||||
logger.exception("Failed to resolve actor for post-push sync (actor_id=%s)", actor_id)
|
||||
return
|
||||
|
||||
org_id = actor.organization_id
|
||||
cache_key = f"{org_id}/{agent_id}"
|
||||
|
||||
repo_path = _repo_cache.get(cache_key)
|
||||
if not repo_path:
|
||||
# Cross-worker fallback: read marker file written by the dulwich process.
|
||||
try:
|
||||
with open(_dulwich_repo_path_marker_file(cache_key), "r") as f:
|
||||
repo_path = f.read().strip() or None
|
||||
except FileNotFoundError:
|
||||
repo_path = None
|
||||
|
||||
if not repo_path:
|
||||
logger.warning("No cached repo for %s after push", cache_key)
|
||||
return
|
||||
@@ -397,41 +447,48 @@ async def _sync_after_push(org_id: str, agent_id: str) -> None:
|
||||
await asyncio.gather(*upload_tasks)
|
||||
logger.info("Uploaded %s files to GCS", len(upload_tasks))
|
||||
|
||||
# Sync blocks to Postgres (if using GitEnabledBlockManager)
|
||||
# Sync blocks to Postgres (if using GitEnabledBlockManager).
|
||||
#
|
||||
# Keep the same pattern as API-driven edits: read from the source of truth
|
||||
# in object storage after persisting the pushed refs/objects, rather than
|
||||
# relying on a working tree checkout under repo_path/.
|
||||
from letta.services.block_manager_git import GitEnabledBlockManager
|
||||
|
||||
if isinstance(_server_instance.block_manager, GitEnabledBlockManager):
|
||||
actor = await _server_instance.user_manager.get_actor_or_default_async(actor_id=None)
|
||||
|
||||
repo = Repo(repo_path)
|
||||
from dulwich.porcelain import reset
|
||||
|
||||
try:
|
||||
reset(repo, "hard")
|
||||
except Exception as e:
|
||||
logger.warning("Failed to reset repo: %s", e)
|
||||
files = await _server_instance.memory_repo_manager.git.get_files(
|
||||
agent_id=agent_id,
|
||||
org_id=org_id,
|
||||
ref="HEAD",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to read repo files from storage for post-push block sync (agent=%s)", agent_id)
|
||||
files = {}
|
||||
|
||||
blocks_dir = os.path.join(repo_path, "blocks")
|
||||
if os.path.exists(blocks_dir):
|
||||
for filename in os.listdir(blocks_dir):
|
||||
if not filename.endswith(".md"):
|
||||
continue
|
||||
synced = 0
|
||||
for file_path, content in files.items():
|
||||
if not file_path.startswith("blocks/") or not file_path.endswith(".md"):
|
||||
continue
|
||||
|
||||
label = filename[:-3]
|
||||
filepath = os.path.join(blocks_dir, filename)
|
||||
with open(filepath, "r") as f:
|
||||
value = f.read()
|
||||
label = file_path[len("blocks/") : -3]
|
||||
await _server_instance.block_manager._sync_block_to_postgres(
|
||||
agent_id=agent_id,
|
||||
label=label,
|
||||
value=content,
|
||||
actor=actor,
|
||||
)
|
||||
synced += 1
|
||||
logger.info("Synced block %s to PostgreSQL", label)
|
||||
|
||||
await _server_instance.block_manager._sync_block_to_postgres(
|
||||
agent_id=agent_id,
|
||||
label=label,
|
||||
value=value,
|
||||
actor=actor,
|
||||
)
|
||||
logger.info("Synced block %s to PostgreSQL", label)
|
||||
if synced == 0:
|
||||
logger.warning("No blocks/*.md files found in repo HEAD during post-push sync (agent=%s)", agent_id)
|
||||
|
||||
# Cleanup local cache
|
||||
_repo_cache.pop(cache_key, None)
|
||||
try:
|
||||
os.unlink(_dulwich_repo_path_marker_file(cache_key))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True)
|
||||
|
||||
|
||||
@@ -551,7 +608,7 @@ async def proxy_git_http(
|
||||
# Authorization check: ensure the actor can access this agent.
|
||||
await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor, include_relationships=[])
|
||||
# Fire-and-forget; do not block git client response.
|
||||
asyncio.create_task(_sync_after_push(actor.organization_id, agent_id))
|
||||
asyncio.create_task(_sync_after_push(actor.id, agent_id))
|
||||
except Exception:
|
||||
logger.exception("Failed to trigger post-push sync (agent_id=%s)", agent_id)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user