diff --git a/letta/server/rest_api/routers/v1/git_http.py b/letta/server/rest_api/routers/v1/git_http.py index 7816857d..cdca169f 100644 --- a/letta/server/rest_api/routers/v1/git_http.py +++ b/letta/server/rest_api/routers/v1/git_http.py @@ -78,6 +78,7 @@ _repo_locks: Dict[str, threading.Lock] = {} # org_id for the currently-handled dulwich request (set by a WSGI wrapper). _current_org_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar("letta_git_http_org_id", default=None) + # Dulwich server globals _dulwich_server = None _dulwich_thread: Optional[threading.Thread] = None @@ -157,6 +158,38 @@ def _require_current_org_id() -> str: return org_id +def _resolve_org_id_from_wsgi_environ(environ: dict) -> Optional[str]: + """Resolve org_id for dulwich, preferring X-Organization-Id. + + This is used by the dulwich WSGI wrapper. If X-Organization-Id is missing, + we fall back to resolving via the authenticated user_id header. + + Note: dulwich is served on 127.0.0.1, so these headers should only be set by + our trusted in-pod proxy layer. + """ + + org_id = environ.get("HTTP_X_ORGANIZATION_ID") + if org_id: + return org_id + + user_id = environ.get("HTTP_USER_ID") + if not user_id: + return None + + if _server_instance is None: + return None + + try: + # We are in a dulwich WSGI thread; run async DB lookup in a fresh loop. + actor = asyncio.run(_server_instance.user_manager.get_actor_by_id_async(user_id)) + resolved = actor.organization_id + except Exception: + logger.exception("Failed to resolve org_id from user_id for dulwich request (user_id=%s)", user_id) + return None + + return resolved + + class GCSBackend(Backend): """Dulwich backend that materializes repos from GCS.""" @@ -475,8 +508,23 @@ async def proxy_git_http( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) # Authorization check: ensure the actor can access this agent. await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor, include_relationships=[]) + + # Ensure we set exactly one X-Organization-Id header (avoid duplicate casing). + for k in list(req_headers.keys()): + if k.lower() == "x-organization-id": + req_headers.pop(k, None) # Use the authenticated actor's org; AgentState may not carry an organization field. - req_headers["x-organization-id"] = actor.organization_id + req_headers["X-Organization-Id"] = actor.organization_id + + logger.info( + "proxy_git_http: method=%s path=%s parsed_agent_id=%s actor_id=%s has_user_id_hdr=%s x_org_hdr=%s", + request.method, + path, + agent_id, + headers.actor_id, + bool(request.headers.get("user_id")), + req_headers.get("X-Organization-Id") or req_headers.get("x-organization-id"), + ) async def _body_iter(): async for chunk in request.stream(): @@ -528,17 +576,46 @@ def _org_header_middleware(app): FastAPI proxies requests to the dulwich server and injects `X-Organization-Id`. Dulwich itself only passes repository *paths* into the Backend, so we capture the org_id from the WSGI environ and stash it in a contextvar. + + Important: WSGI apps can return iterables/generators, and the server may + iterate the response body *after* this wrapper returns. We must therefore + keep the contextvar set for the duration of iteration. + + Defensive fallback: if X-Organization-Id is missing, attempt to derive org_id + from `user_id` (set by our auth proxy layer). """ def _wrapped(environ, start_response): - token = None + org_id = _resolve_org_id_from_wsgi_environ(environ) + + logger.info( + "dulwich_wsgi: path=%s remote=%s has_x_org=%s has_user_id=%s resolved_org=%s", + environ.get("PATH_INFO"), + environ.get("REMOTE_ADDR"), + bool(environ.get("HTTP_X_ORGANIZATION_ID")), + bool(environ.get("HTTP_USER_ID")), + org_id, + ) + + token = _current_org_id.set(org_id) + try: - org_id = environ.get("HTTP_X_ORGANIZATION_ID") - token = _current_org_id.set(org_id) - return app(environ, start_response) - finally: - if token is not None: - _current_org_id.reset(token) + app_iter = app(environ, start_response) + except Exception: + _current_org_id.reset(token) + raise + + def _iter(): + try: + yield from app_iter + finally: + try: + if hasattr(app_iter, "close"): + app_iter.close() + finally: + _current_org_id.reset(token) + + return _iter() return _wrapped