feat: git smart HTTP for agent memory repos (#9257)
* feat(core): add git-backed memory repos and block manager Introduce a GCS-backed git repository per agent as the source of truth for core memory blocks. Add a GitEnabledBlockManager that writes block updates to git and syncs values back into Postgres as a cache. Default newly-created memory repos to the `main` branch. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * feat(core): serve memory repos over git smart HTTP Run dulwich's WSGI HTTPGitApplication on a local sidecar port and proxy /v1/git/* through FastAPI to support git clone/fetch/push directly against GCS-backed memory repos. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): create memory repos on demand and stabilize git HTTP - Ensure MemoryRepoManager creates the git repo on first write (instead of 500ing) and avoids rewriting history by only auto-creating on FileNotFoundError. - Simplify dulwich-thread async execution and auto-create empty repos on first git clone. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): make dulwich optional for CI installs Guard dulwich imports in the git smart HTTP router so the core server can boot (and CI tests can run) without installing the memory-repo extra. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): guard git HTTP WSGI init when dulwich missing Avoid instantiating dulwich's HTTPGitApplication at import time when dulwich isn't installed (common in CI installs). 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): avoid masking send_message errors in finally Initialize `result` before the agent loop so error paths (e.g. approval validation) don't raise UnboundLocalError in the run-tracking finally block. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): stop event loop watchdog on FastAPI shutdown Ensure the EventLoopWatchdog thread is stopped during FastAPI lifespan shutdown to avoid daemon threads logging during interpreter teardown (seen in CI unit tests). 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * chore(core): remove send_*_message_to_agent from SyncServer Drop send_message_to_agent and send_group_message_to_agent from SyncServer and route internal fire-and-forget messaging through send_messages helpers instead. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): backfill git memory repo when tag added When an agent is updated to include the git-memory-enabled tag, ensure the git-backed memory repo is created and initialized from the agent's current blocks. Also support configuring the memory repo object store via LETTA_OBJECT_STORE_URI. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): preserve block tags on git-enabled updates When updating a block for a git-memory-enabled agent, keep block tags in sync with PostgreSQL (tags are not currently stored in the git repo). 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * chore(core): remove git-state legacy shims - Rename optional dependency extra from memory-repo to git-state - Drop legacy object-store env aliases and unused region config - Simplify memory repo metadata to a single canonical format - Remove unused repo-cache invalidation helper 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): keep PR scope for git-backed blocks - Revert unrelated change in fire-and-forget multi-agent send helper - Route agent block updates-by-label through injected block manager only when needed 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> --------- Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
committed by
Caren Thomas
parent
16c96cc3c0
commit
50a60c1393
@@ -91,6 +91,7 @@ from letta.services.agent_manager import AgentManager
|
||||
from letta.services.agent_serialization_manager import AgentSerializationManager
|
||||
from letta.services.archive_manager import ArchiveManager
|
||||
from letta.services.block_manager import BlockManager
|
||||
from letta.services.block_manager_git import GIT_MEMORY_ENABLED_TAG, GitEnabledBlockManager
|
||||
from letta.services.file_manager import FileManager
|
||||
from letta.services.files_agents_manager import FileAgentManager
|
||||
from letta.services.group_manager import GroupManager
|
||||
@@ -104,6 +105,8 @@ from letta.services.mcp.sse_client import MCP_CONFIG_TOPLEVEL_KEY
|
||||
from letta.services.mcp.stdio_client import AsyncStdioMCPClient
|
||||
from letta.services.mcp_manager import MCPManager
|
||||
from letta.services.mcp_server_manager import MCPServerManager
|
||||
from letta.services.memory_repo import MemoryRepoManager
|
||||
from letta.services.memory_repo.storage.gcs import GCSStorageBackend
|
||||
from letta.services.message_manager import MessageManager
|
||||
from letta.services.organization_manager import OrganizationManager
|
||||
from letta.services.passage_manager import PassageManager
|
||||
@@ -165,13 +168,19 @@ class SyncServer(object):
|
||||
self.tool_manager = ToolManager()
|
||||
self.mcp_manager = MCPManager()
|
||||
self.mcp_server_manager = MCPServerManager()
|
||||
self.block_manager = BlockManager()
|
||||
self.memory_repo_manager = self._init_memory_repo_manager()
|
||||
# Use git-enabled block manager if memory repo is configured
|
||||
# It falls back to standard PostgreSQL behavior when git isn't enabled for an agent
|
||||
if self.memory_repo_manager:
|
||||
self.block_manager = GitEnabledBlockManager(memory_repo_manager=self.memory_repo_manager)
|
||||
else:
|
||||
self.block_manager = BlockManager()
|
||||
self.source_manager = SourceManager()
|
||||
self.sandbox_config_manager = SandboxConfigManager()
|
||||
self.message_manager = MessageManager()
|
||||
self.job_manager = JobManager()
|
||||
self.run_manager = RunManager()
|
||||
self.agent_manager = AgentManager()
|
||||
self.agent_manager = AgentManager(block_manager=self.block_manager)
|
||||
self.archive_manager = ArchiveManager()
|
||||
self.provider_manager = ProviderManager()
|
||||
self.step_manager = StepManager()
|
||||
@@ -416,6 +425,55 @@ class SyncServer(object):
|
||||
force_recreate=True,
|
||||
)
|
||||
|
||||
def _init_memory_repo_manager(self) -> Optional[MemoryRepoManager]:
|
||||
"""Initialize the memory repository manager if configured.
|
||||
|
||||
Configure the object store via settings (recommended):
|
||||
|
||||
LETTA_OBJECT_STORE_URI="gs://my-bucket/repository?project=my-gcp-project"
|
||||
|
||||
Supported schemes:
|
||||
- gs:// (or gcs://) -> Google Cloud Storage
|
||||
|
||||
Returns:
|
||||
MemoryRepoManager if configured, None otherwise
|
||||
"""
|
||||
|
||||
# Keep import local to avoid import/circular issues during server bootstrap.
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from letta.settings import settings
|
||||
|
||||
uri = settings.object_store_uri
|
||||
if not uri:
|
||||
logger.debug("Memory repo manager not configured (object_store_uri not set)")
|
||||
return None
|
||||
|
||||
try:
|
||||
parsed = urlparse(uri)
|
||||
scheme = (parsed.scheme or "").lower()
|
||||
|
||||
if scheme in {"gs", "gcs"}:
|
||||
bucket = parsed.netloc
|
||||
if not bucket:
|
||||
raise ValueError(f"Invalid GCS object store URI (missing bucket): {uri}")
|
||||
|
||||
# URI path is treated as the storage prefix
|
||||
prefix = parsed.path.lstrip("/") or "repository"
|
||||
qs = parse_qs(parsed.query)
|
||||
|
||||
# Allow settings-level overrides (handy for templated URIs).
|
||||
project = settings.object_store_project or (qs.get("project") or [None])[0]
|
||||
|
||||
storage = GCSStorageBackend(bucket=bucket, prefix=prefix, project=project)
|
||||
logger.info("Memory repo manager initialized with object store: %s", uri)
|
||||
return MemoryRepoManager(storage=storage)
|
||||
|
||||
raise ValueError(f"Unsupported object store scheme '{scheme}' in URI: {uri}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to initialize memory repo manager: {e}")
|
||||
return None
|
||||
|
||||
def _get_enabled_provider(self, provider_name: str) -> Optional[Provider]:
|
||||
"""Find and return an enabled provider by name.
|
||||
|
||||
@@ -571,13 +629,31 @@ class SyncServer(object):
|
||||
request.embedding_config = await self.get_embedding_config_from_handle_async(actor=actor, **embedding_config_params)
|
||||
log_event(name="end get_embedding_config_from_handle", attributes=embedding_config_params)
|
||||
|
||||
# If git-backed memory is requested on create, we enable it *after* agent creation.
|
||||
# We strip the tag during creation so `enable_git_memory_for_agent` can be the
|
||||
# single place that both creates the repo and writes the tag.
|
||||
wants_git_memory = bool(request.tags and GIT_MEMORY_ENABLED_TAG in request.tags)
|
||||
create_request = request
|
||||
if wants_git_memory:
|
||||
filtered_tags = [t for t in (request.tags or []) if t != GIT_MEMORY_ENABLED_TAG]
|
||||
create_request = request.model_copy(update={"tags": filtered_tags})
|
||||
|
||||
log_event(name="start create_agent db")
|
||||
main_agent = await self.agent_manager.create_agent_async(
|
||||
agent_create=request,
|
||||
agent_create=create_request,
|
||||
actor=actor,
|
||||
)
|
||||
log_event(name="end create_agent db")
|
||||
|
||||
# Enable git-backed memory (creates repo + commits initial blocks + adds tag)
|
||||
if wants_git_memory and isinstance(self.block_manager, GitEnabledBlockManager):
|
||||
await self.block_manager.enable_git_memory_for_agent(agent_id=main_agent.id, actor=actor)
|
||||
# Preserve the user's requested tags in the response model.
|
||||
try:
|
||||
main_agent.tags = list(request.tags or [])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
log_event(name="start insert_files_into_context_window db")
|
||||
# Use folder_ids if provided, otherwise fall back to deprecated source_ids for backwards compatibility
|
||||
folder_ids_to_attach = request.folder_ids if request.folder_ids else request.source_ids
|
||||
@@ -650,12 +726,26 @@ class SyncServer(object):
|
||||
else:
|
||||
await self.create_sleeptime_agent_async(main_agent=agent, actor=actor)
|
||||
|
||||
return await self.agent_manager.update_agent_async(
|
||||
# If git-backed memory is requested via tag update, initialize/backfill the repo.
|
||||
wants_git_memory = bool(request.tags and GIT_MEMORY_ENABLED_TAG in request.tags)
|
||||
|
||||
updated_agent = await self.agent_manager.update_agent_async(
|
||||
agent_id=agent_id,
|
||||
agent_update=request,
|
||||
actor=actor,
|
||||
)
|
||||
|
||||
# Ensure repo exists and initial blocks are committed when the tag is present.
|
||||
if wants_git_memory and isinstance(self.block_manager, GitEnabledBlockManager):
|
||||
await self.block_manager.enable_git_memory_for_agent(agent_id=agent_id, actor=actor)
|
||||
# Preserve the user's requested tags in the response model.
|
||||
try:
|
||||
updated_agent.tags = list(request.tags or [])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return updated_agent
|
||||
|
||||
async def create_sleeptime_agent_async(self, main_agent: AgentState, actor: User) -> Optional[AgentState]:
|
||||
if main_agent.embedding_config is None:
|
||||
logger.warning(f"Skipping sleeptime agent creation for agent {main_agent.id}: no embedding config provided")
|
||||
|
||||
Reference in New Issue
Block a user