fix(core): derive dulwich org context from user_id fallback (#9296)
* fix(core): derive dulwich org context from user_id fallback Make git smart HTTP more robust in prod by: - normalizing/injecting a single X-Organization-Id header in the FastAPI proxy - keeping dulwich org contextvar set through WSGI iteration - falling back to resolving org_id from user_id when X-Organization-Id is missing - adding opt-in debug logs (env LETTA_GIT_HTTP_DEBUG_LOGS or letta_debug query) 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * refactor(core): drop user->org cache in dulwich org fallback Keep the dulwich org_id fallback simple by resolving org_id from user_id via UserManager lookup when X-Organization-Id is missing, without maintaining an in-process cache. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * chore(core): make git HTTP debug logging always-on Remove opt-in toggles for git HTTP debug logs and log proxy + dulwich request context for every git smart-HTTP request. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> --------- Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
committed by
Caren Thomas
parent
c801866d89
commit
fdc32f6054
@@ -78,6 +78,7 @@ _repo_locks: Dict[str, threading.Lock] = {}
|
|||||||
# org_id for the currently-handled dulwich request (set by a WSGI wrapper).
|
# org_id for the currently-handled dulwich request (set by a WSGI wrapper).
|
||||||
_current_org_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar("letta_git_http_org_id", default=None)
|
_current_org_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar("letta_git_http_org_id", default=None)
|
||||||
|
|
||||||
|
|
||||||
# Dulwich server globals
|
# Dulwich server globals
|
||||||
_dulwich_server = None
|
_dulwich_server = None
|
||||||
_dulwich_thread: Optional[threading.Thread] = None
|
_dulwich_thread: Optional[threading.Thread] = None
|
||||||
@@ -157,6 +158,38 @@ def _require_current_org_id() -> str:
|
|||||||
return org_id
|
return org_id
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_org_id_from_wsgi_environ(environ: dict) -> Optional[str]:
|
||||||
|
"""Resolve org_id for dulwich, preferring X-Organization-Id.
|
||||||
|
|
||||||
|
This is used by the dulwich WSGI wrapper. If X-Organization-Id is missing,
|
||||||
|
we fall back to resolving via the authenticated user_id header.
|
||||||
|
|
||||||
|
Note: dulwich is served on 127.0.0.1, so these headers should only be set by
|
||||||
|
our trusted in-pod proxy layer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
org_id = environ.get("HTTP_X_ORGANIZATION_ID")
|
||||||
|
if org_id:
|
||||||
|
return org_id
|
||||||
|
|
||||||
|
user_id = environ.get("HTTP_USER_ID")
|
||||||
|
if not user_id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if _server_instance is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# We are in a dulwich WSGI thread; run async DB lookup in a fresh loop.
|
||||||
|
actor = asyncio.run(_server_instance.user_manager.get_actor_by_id_async(user_id))
|
||||||
|
resolved = actor.organization_id
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to resolve org_id from user_id for dulwich request (user_id=%s)", user_id)
|
||||||
|
return None
|
||||||
|
|
||||||
|
return resolved
|
||||||
|
|
||||||
|
|
||||||
class GCSBackend(Backend):
|
class GCSBackend(Backend):
|
||||||
"""Dulwich backend that materializes repos from GCS."""
|
"""Dulwich backend that materializes repos from GCS."""
|
||||||
|
|
||||||
@@ -475,8 +508,23 @@ async def proxy_git_http(
|
|||||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||||
# Authorization check: ensure the actor can access this agent.
|
# Authorization check: ensure the actor can access this agent.
|
||||||
await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor, include_relationships=[])
|
await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor, include_relationships=[])
|
||||||
|
|
||||||
|
# Ensure we set exactly one X-Organization-Id header (avoid duplicate casing).
|
||||||
|
for k in list(req_headers.keys()):
|
||||||
|
if k.lower() == "x-organization-id":
|
||||||
|
req_headers.pop(k, None)
|
||||||
# Use the authenticated actor's org; AgentState may not carry an organization field.
|
# Use the authenticated actor's org; AgentState may not carry an organization field.
|
||||||
req_headers["x-organization-id"] = actor.organization_id
|
req_headers["X-Organization-Id"] = actor.organization_id
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"proxy_git_http: method=%s path=%s parsed_agent_id=%s actor_id=%s has_user_id_hdr=%s x_org_hdr=%s",
|
||||||
|
request.method,
|
||||||
|
path,
|
||||||
|
agent_id,
|
||||||
|
headers.actor_id,
|
||||||
|
bool(request.headers.get("user_id")),
|
||||||
|
req_headers.get("X-Organization-Id") or req_headers.get("x-organization-id"),
|
||||||
|
)
|
||||||
|
|
||||||
async def _body_iter():
|
async def _body_iter():
|
||||||
async for chunk in request.stream():
|
async for chunk in request.stream():
|
||||||
@@ -528,17 +576,46 @@ def _org_header_middleware(app):
|
|||||||
FastAPI proxies requests to the dulwich server and injects `X-Organization-Id`.
|
FastAPI proxies requests to the dulwich server and injects `X-Organization-Id`.
|
||||||
Dulwich itself only passes repository *paths* into the Backend, so we capture
|
Dulwich itself only passes repository *paths* into the Backend, so we capture
|
||||||
the org_id from the WSGI environ and stash it in a contextvar.
|
the org_id from the WSGI environ and stash it in a contextvar.
|
||||||
|
|
||||||
|
Important: WSGI apps can return iterables/generators, and the server may
|
||||||
|
iterate the response body *after* this wrapper returns. We must therefore
|
||||||
|
keep the contextvar set for the duration of iteration.
|
||||||
|
|
||||||
|
Defensive fallback: if X-Organization-Id is missing, attempt to derive org_id
|
||||||
|
from `user_id` (set by our auth proxy layer).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _wrapped(environ, start_response):
|
def _wrapped(environ, start_response):
|
||||||
token = None
|
org_id = _resolve_org_id_from_wsgi_environ(environ)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"dulwich_wsgi: path=%s remote=%s has_x_org=%s has_user_id=%s resolved_org=%s",
|
||||||
|
environ.get("PATH_INFO"),
|
||||||
|
environ.get("REMOTE_ADDR"),
|
||||||
|
bool(environ.get("HTTP_X_ORGANIZATION_ID")),
|
||||||
|
bool(environ.get("HTTP_USER_ID")),
|
||||||
|
org_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
token = _current_org_id.set(org_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
org_id = environ.get("HTTP_X_ORGANIZATION_ID")
|
app_iter = app(environ, start_response)
|
||||||
token = _current_org_id.set(org_id)
|
except Exception:
|
||||||
return app(environ, start_response)
|
_current_org_id.reset(token)
|
||||||
finally:
|
raise
|
||||||
if token is not None:
|
|
||||||
_current_org_id.reset(token)
|
def _iter():
|
||||||
|
try:
|
||||||
|
yield from app_iter
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
if hasattr(app_iter, "close"):
|
||||||
|
app_iter.close()
|
||||||
|
finally:
|
||||||
|
_current_org_id.reset(token)
|
||||||
|
|
||||||
|
return _iter()
|
||||||
|
|
||||||
return _wrapped
|
return _wrapped
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user