diff --git a/letta/server/rest_api/routers/v1/git_http.py b/letta/server/rest_api/routers/v1/git_http.py index cdca169f..50a7c985 100644 --- a/letta/server/rest_api/routers/v1/git_http.py +++ b/letta/server/rest_api/routers/v1/git_http.py @@ -75,6 +75,26 @@ _server_instance = None _repo_cache: Dict[str, str] = {} _repo_locks: Dict[str, threading.Lock] = {} + +def _dulwich_repo_path_marker_file(cache_key: str) -> str: + """Path to a marker file that stores the dulwich temp repo path. + + Dulwich runs in-process and mutates a repo materialized into a temp directory. + We then need to locate that same temp directory after the push to persist the + updated `.git/` contents back to object storage. + + In production we may have multiple FastAPI workers; in-memory `_repo_cache` + is not shared across workers, so we store the repo_path in a small file under + /tmp as a best-effort handoff. (Longer-term, we'll likely move dulwich to its + own service/process and remove this.) + """ + + safe = cache_key.replace("/", "__") + base = os.path.join(tempfile.gettempdir(), "letta-git-http") + os.makedirs(base, exist_ok=True) + return os.path.join(base, f"dulwich_repo_path__{safe}.txt") + + # org_id for the currently-handled dulwich request (set by a WSGI wrapper). _current_org_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar("letta_git_http_org_id", default=None) @@ -235,10 +255,21 @@ class GCSBackend(Backend): old_repo_path = _repo_cache.pop(cache_key, None) if old_repo_path: shutil.rmtree(os.path.dirname(old_repo_path), ignore_errors=True) + try: + os.unlink(_dulwich_repo_path_marker_file(cache_key)) + except FileNotFoundError: + pass repo_path = self._download_repo_sync(agent_id=agent_id, org_id=org_id) _repo_cache[cache_key] = repo_path + # Persist repo_path for cross-worker post-push sync. + try: + with open(_dulwich_repo_path_marker_file(cache_key), "w") as f: + f.write(repo_path) + except Exception: + logger.exception("Failed to write repo_path marker for %s", cache_key) + repo = Repo(repo_path) _prune_broken_refs(repo) return repo @@ -362,12 +393,31 @@ def _prune_broken_refs(repo: Repo) -> int: return removed -async def _sync_after_push(org_id: str, agent_id: str) -> None: +async def _sync_after_push(actor_id: str, agent_id: str) -> None: """Sync repo back to GCS and PostgreSQL after a successful push.""" + if _server_instance is None: + logger.warning("Server instance not set; cannot sync after push") + return + + try: + actor = await _server_instance.user_manager.get_actor_by_id_async(actor_id) + except Exception: + logger.exception("Failed to resolve actor for post-push sync (actor_id=%s)", actor_id) + return + + org_id = actor.organization_id cache_key = f"{org_id}/{agent_id}" repo_path = _repo_cache.get(cache_key) + if not repo_path: + # Cross-worker fallback: read marker file written by the dulwich process. + try: + with open(_dulwich_repo_path_marker_file(cache_key), "r") as f: + repo_path = f.read().strip() or None + except FileNotFoundError: + repo_path = None + if not repo_path: logger.warning("No cached repo for %s after push", cache_key) return @@ -397,41 +447,48 @@ async def _sync_after_push(org_id: str, agent_id: str) -> None: await asyncio.gather(*upload_tasks) logger.info("Uploaded %s files to GCS", len(upload_tasks)) - # Sync blocks to Postgres (if using GitEnabledBlockManager) + # Sync blocks to Postgres (if using GitEnabledBlockManager). + # + # Keep the same pattern as API-driven edits: read from the source of truth + # in object storage after persisting the pushed refs/objects, rather than + # relying on a working tree checkout under repo_path/. from letta.services.block_manager_git import GitEnabledBlockManager if isinstance(_server_instance.block_manager, GitEnabledBlockManager): - actor = await _server_instance.user_manager.get_actor_or_default_async(actor_id=None) - - repo = Repo(repo_path) - from dulwich.porcelain import reset - try: - reset(repo, "hard") - except Exception as e: - logger.warning("Failed to reset repo: %s", e) + files = await _server_instance.memory_repo_manager.git.get_files( + agent_id=agent_id, + org_id=org_id, + ref="HEAD", + ) + except Exception: + logger.exception("Failed to read repo files from storage for post-push block sync (agent=%s)", agent_id) + files = {} - blocks_dir = os.path.join(repo_path, "blocks") - if os.path.exists(blocks_dir): - for filename in os.listdir(blocks_dir): - if not filename.endswith(".md"): - continue + synced = 0 + for file_path, content in files.items(): + if not file_path.startswith("blocks/") or not file_path.endswith(".md"): + continue - label = filename[:-3] - filepath = os.path.join(blocks_dir, filename) - with open(filepath, "r") as f: - value = f.read() + label = file_path[len("blocks/") : -3] + await _server_instance.block_manager._sync_block_to_postgres( + agent_id=agent_id, + label=label, + value=content, + actor=actor, + ) + synced += 1 + logger.info("Synced block %s to PostgreSQL", label) - await _server_instance.block_manager._sync_block_to_postgres( - agent_id=agent_id, - label=label, - value=value, - actor=actor, - ) - logger.info("Synced block %s to PostgreSQL", label) + if synced == 0: + logger.warning("No blocks/*.md files found in repo HEAD during post-push sync (agent=%s)", agent_id) # Cleanup local cache _repo_cache.pop(cache_key, None) + try: + os.unlink(_dulwich_repo_path_marker_file(cache_key)) + except FileNotFoundError: + pass shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) @@ -551,7 +608,7 @@ async def proxy_git_http( # Authorization check: ensure the actor can access this agent. await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor, include_relationships=[]) # Fire-and-forget; do not block git client response. - asyncio.create_task(_sync_after_push(actor.organization_id, agent_id)) + asyncio.create_task(_sync_after_push(actor.id, agent_id)) except Exception: logger.exception("Failed to trigger post-push sync (agent_id=%s)", agent_id)