From 50a60c139307d94fb9842ca3fb0928320ee7a9b0 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Tue, 3 Feb 2026 22:55:46 -0800 Subject: [PATCH] feat: git smart HTTP for agent memory repos (#9257) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(core): add git-backed memory repos and block manager Introduce a GCS-backed git repository per agent as the source of truth for core memory blocks. Add a GitEnabledBlockManager that writes block updates to git and syncs values back into Postgres as a cache. Default newly-created memory repos to the `main` branch. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * feat(core): serve memory repos over git smart HTTP Run dulwich's WSGI HTTPGitApplication on a local sidecar port and proxy /v1/git/* through FastAPI to support git clone/fetch/push directly against GCS-backed memory repos. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix(core): create memory repos on demand and stabilize git HTTP - Ensure MemoryRepoManager creates the git repo on first write (instead of 500ing) and avoids rewriting history by only auto-creating on FileNotFoundError. - Simplify dulwich-thread async execution and auto-create empty repos on first git clone. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix(core): make dulwich optional for CI installs Guard dulwich imports in the git smart HTTP router so the core server can boot (and CI tests can run) without installing the memory-repo extra. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix(core): guard git HTTP WSGI init when dulwich missing Avoid instantiating dulwich's HTTPGitApplication at import time when dulwich isn't installed (common in CI installs). 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix(core): avoid masking send_message errors in finally Initialize `result` before the agent loop so error paths (e.g. approval validation) don't raise UnboundLocalError in the run-tracking finally block. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix(core): stop event loop watchdog on FastAPI shutdown Ensure the EventLoopWatchdog thread is stopped during FastAPI lifespan shutdown to avoid daemon threads logging during interpreter teardown (seen in CI unit tests). 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * chore(core): remove send_*_message_to_agent from SyncServer Drop send_message_to_agent and send_group_message_to_agent from SyncServer and route internal fire-and-forget messaging through send_messages helpers instead. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix(core): backfill git memory repo when tag added When an agent is updated to include the git-memory-enabled tag, ensure the git-backed memory repo is created and initialized from the agent's current blocks. Also support configuring the memory repo object store via LETTA_OBJECT_STORE_URI. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix(core): preserve block tags on git-enabled updates When updating a block for a git-memory-enabled agent, keep block tags in sync with PostgreSQL (tags are not currently stored in the git repo). 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * chore(core): remove git-state legacy shims - Rename optional dependency extra from memory-repo to git-state - Drop legacy object-store env aliases and unused region config - Simplify memory repo metadata to a single canonical format - Remove unused repo-cache invalidation helper 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix(core): keep PR scope for git-backed blocks - Revert unrelated change in fire-and-forget multi-agent send helper - Route agent block updates-by-label through injected block manager only when needed 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta --------- Co-authored-by: Letta --- letta/constants.py | 4 + letta/data_sources/redis_client.py | 80 ++- letta/errors.py | 31 +- letta/schemas/memory_repo.py | 44 ++ letta/server/rest_api/app.py | 33 +- letta/server/rest_api/routers/v1/__init__.py | 2 + letta/server/rest_api/routers/v1/git_http.py | 506 +++++++++++++++ letta/server/server.py | 98 ++- letta/services/agent_manager.py | 64 +- letta/services/block_manager.py | 23 + letta/services/block_manager_git.py | 511 +++++++++++++++ letta/services/memory_repo/__init__.py | 11 + letta/services/memory_repo/git_operations.py | 599 ++++++++++++++++++ .../services/memory_repo/storage/__init__.py | 9 + letta/services/memory_repo/storage/base.py | 127 ++++ letta/settings.py | 18 + pyproject.toml | 6 + uv.lock | 135 +++- 18 files changed, 2254 insertions(+), 47 deletions(-) create mode 100644 letta/schemas/memory_repo.py create mode 100644 letta/server/rest_api/routers/v1/git_http.py create mode 100644 letta/services/block_manager_git.py create mode 100644 letta/services/memory_repo/__init__.py create mode 100644 letta/services/memory_repo/git_operations.py create mode 100644 letta/services/memory_repo/storage/__init__.py create mode 100644 letta/services/memory_repo/storage/base.py diff --git a/letta/constants.py b/letta/constants.py index 81f77df6..7121913a 100644 --- a/letta/constants.py +++ b/letta/constants.py @@ -457,6 +457,10 @@ REDIS_RUN_ID_PREFIX = "agent:send_message:run_id" CONVERSATION_LOCK_PREFIX = "conversation:lock:" CONVERSATION_LOCK_TTL_SECONDS = 300 # 5 minutes +# Memory repo locks - prevents concurrent modifications to git-based memory +MEMORY_REPO_LOCK_PREFIX = "memory_repo:lock:" +MEMORY_REPO_LOCK_TTL_SECONDS = 60 # 1 minute (git operations should be fast) + # TODO: This is temporary, eventually use token-based eviction # File based controls DEFAULT_MAX_FILES_OPEN = 5 diff --git a/letta/data_sources/redis_client.py b/letta/data_sources/redis_client.py index 8c4f5fec..5bbb6987 100644 --- a/letta/data_sources/redis_client.py +++ b/letta/data_sources/redis_client.py @@ -2,8 +2,16 @@ import asyncio from functools import wraps from typing import Any, Dict, List, Optional, Set, Union -from letta.constants import CONVERSATION_LOCK_PREFIX, CONVERSATION_LOCK_TTL_SECONDS, REDIS_EXCLUDE, REDIS_INCLUDE, REDIS_SET_DEFAULT_VAL -from letta.errors import ConversationBusyError +from letta.constants import ( + CONVERSATION_LOCK_PREFIX, + CONVERSATION_LOCK_TTL_SECONDS, + MEMORY_REPO_LOCK_PREFIX, + MEMORY_REPO_LOCK_TTL_SECONDS, + REDIS_EXCLUDE, + REDIS_INCLUDE, + REDIS_SET_DEFAULT_VAL, +) +from letta.errors import ConversationBusyError, MemoryRepoBusyError from letta.log import get_logger from letta.settings import settings @@ -230,6 +238,64 @@ class AsyncRedisClient: logger.warning(f"Failed to release conversation lock for conversation {conversation_id}: {e}") return False + async def acquire_memory_repo_lock( + self, + agent_id: str, + token: str, + ) -> Optional["Lock"]: + """ + Acquire a distributed lock for a memory repository. + + Prevents concurrent modifications to an agent's git-based memory. + + Args: + agent_id: The agent ID whose memory is being modified + token: Unique identifier for the lock holder (for debugging/tracing) + + Returns: + Lock object if acquired, raises MemoryRepoBusyError if in use + """ + if Lock is None: + return None + client = await self.get_client() + lock_key = f"{MEMORY_REPO_LOCK_PREFIX}{agent_id}" + lock = Lock( + client, + lock_key, + timeout=MEMORY_REPO_LOCK_TTL_SECONDS, + blocking=False, + thread_local=False, + raise_on_release_error=False, + ) + + if await lock.acquire(token=token): + return lock + + lock_holder_token = await client.get(lock_key) + raise MemoryRepoBusyError( + agent_id=agent_id, + lock_holder_token=lock_holder_token, + ) + + async def release_memory_repo_lock(self, agent_id: str) -> bool: + """ + Release a memory repo lock by agent_id. + + Args: + agent_id: The agent ID to release the lock for + + Returns: + True if lock was released, False if release failed + """ + try: + client = await self.get_client() + lock_key = f"{MEMORY_REPO_LOCK_PREFIX}{agent_id}" + await client.delete(lock_key) + return True + except Exception as e: + logger.warning(f"Failed to release memory repo lock for agent {agent_id}: {e}") + return False + @with_retry() async def exists(self, *keys: str) -> int: """Check if keys exist.""" @@ -464,6 +530,16 @@ class NoopAsyncRedisClient(AsyncRedisClient): async def release_conversation_lock(self, conversation_id: str) -> bool: return False + async def acquire_memory_repo_lock( + self, + agent_id: str, + token: str, + ) -> Optional["Lock"]: + return None + + async def release_memory_repo_lock(self, agent_id: str) -> bool: + return False + async def check_inclusion_and_exclusion(self, member: str, group: str) -> bool: return False diff --git a/letta/errors.py b/letta/errors.py index 5a2eb849..e21413a2 100644 --- a/letta/errors.py +++ b/letta/errors.py @@ -91,6 +91,22 @@ class ConversationBusyError(LettaError): super().__init__(message=message, code=code, details=details) +class MemoryRepoBusyError(LettaError): + """Error raised when attempting to modify memory while another operation is in progress.""" + + def __init__(self, agent_id: str, lock_holder_token: Optional[str] = None): + self.agent_id = agent_id + self.lock_holder_token = lock_holder_token + message = "Cannot modify memory: Another operation is currently in progress for this agent's memory. Please wait for the current operation to complete." + code = ErrorCode.CONFLICT + details = { + "error_code": "MEMORY_REPO_BUSY", + "agent_id": agent_id, + "lock_holder_token": lock_holder_token, + } + super().__init__(message=message, code=code, details=details) + + class LettaToolCreateError(LettaError): """Error raised when a tool cannot be created.""" @@ -167,7 +183,9 @@ class LettaImageFetchError(LettaError): def __init__(self, url: str, reason: str): details = {"url": url, "reason": reason} super().__init__( - message=f"Failed to fetch image from {url}: {reason}", code=ErrorCode.INVALID_ARGUMENT, details=details, + message=f"Failed to fetch image from {url}: {reason}", + code=ErrorCode.INVALID_ARGUMENT, + details=details, ) @@ -308,7 +326,9 @@ class ContextWindowExceededError(LettaError): def __init__(self, message: str, details: dict = {}): error_message = f"{message} ({details})" super().__init__( - message=error_message, code=ErrorCode.CONTEXT_WINDOW_EXCEEDED, details=details, + message=error_message, + code=ErrorCode.CONTEXT_WINDOW_EXCEEDED, + details=details, ) @@ -328,7 +348,9 @@ class RateLimitExceededError(LettaError): def __init__(self, message: str, max_retries: int): error_message = f"{message} ({max_retries})" super().__init__( - message=error_message, code=ErrorCode.RATE_LIMIT_EXCEEDED, details={"max_retries": max_retries}, + message=error_message, + code=ErrorCode.RATE_LIMIT_EXCEEDED, + details={"max_retries": max_retries}, ) @@ -383,7 +405,8 @@ class HandleNotFoundError(LettaError): def __init__(self, handle: str, available_handles: List[str]): super().__init__( - message=f"Handle {handle} not found, must be one of {available_handles}", code=ErrorCode.NOT_FOUND, + message=f"Handle {handle} not found, must be one of {available_handles}", + code=ErrorCode.NOT_FOUND, ) diff --git a/letta/schemas/memory_repo.py b/letta/schemas/memory_repo.py new file mode 100644 index 00000000..c306a767 --- /dev/null +++ b/letta/schemas/memory_repo.py @@ -0,0 +1,44 @@ +"""Pydantic schemas for git-based memory repositories. + +These are used internally by the git-backed block/memory repository services. + +Note: REST "sync" request/response schemas were removed when we switched to +clients interacting with repositories directly via git smart HTTP. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional + +from pydantic import Field + +from letta.schemas.letta_base import LettaBase + + +class MemoryCommit(LettaBase): + """Represents a commit in the memory repository.""" + + __id_prefix__ = "memcommit" + + sha: str = Field(..., description="Commit SHA (40-char hex).") + parent_sha: Optional[str] = Field(None, description="Parent commit SHA.") + message: str = Field(..., description="Commit message.") + + author_type: str = Field(..., description="Author type: agent, user, system.") + author_id: str = Field(..., description="Author ID.") + author_name: Optional[str] = Field(None, description="Human-readable author name.") + + timestamp: datetime = Field(..., description="Commit timestamp.") + + files_changed: List[str] = Field(default_factory=list, description="List of changed file paths.") + additions: int = Field(default=0, description="Number of lines/chars added.") + deletions: int = Field(default=0, description="Number of lines/chars deleted.") + + +class FileChange(LettaBase): + """Represents a file change for committing.""" + + path: str = Field(..., description="File path within repository.") + content: Optional[str] = Field(None, description="New file content (None for delete).") + change_type: str = Field(default="modify", description="Change type: add, modify, delete.") diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index 60f1b123..b58d083e 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -192,6 +192,17 @@ async def lifespan(app_: FastAPI): logger.info(f"[Worker {worker_id}] Starting scheduler with leader election") global server await server.init_async(init_with_default_org_and_user=not settings.no_default_actor) + + # Set server instance for git HTTP endpoints and start dulwich sidecar + try: + from letta.server.rest_api.routers.v1.git_http import set_server_instance, start_dulwich_server + + set_server_instance(server) + start_dulwich_server() + logger.info(f"[Worker {worker_id}] Git HTTP server instance set (dulwich sidecar started)") + except Exception as e: + logger.warning(f"[Worker {worker_id}] Failed to start git HTTP sidecar: {e}") + try: await start_scheduler_with_leader_election(server) logger.info(f"[Worker {worker_id}] Scheduler initialization completed") @@ -203,6 +214,15 @@ async def lifespan(app_: FastAPI): # Cleanup on shutdown logger.info(f"[Worker {worker_id}] Starting lifespan shutdown") + # Stop watchdog thread (important for clean test/worker shutdown) + try: + from letta.monitoring.event_loop_watchdog import stop_watchdog + + stop_watchdog() + logger.info(f"[Worker {worker_id}] Event loop watchdog stopped") + except Exception as e: + logger.warning(f"[Worker {worker_id}] Failed to stop watchdog: {e}") + try: from letta.jobs.scheduler import shutdown_scheduler_and_release_lock @@ -221,17 +241,6 @@ async def lifespan(app_: FastAPI): except Exception as e: logger.warning(f"[Worker {worker_id}] SQLAlchemy instrumentation shutdown failed: {e}") - # Shutdown LLM raw trace writer (closes ClickHouse connection) - if settings.store_llm_traces: - try: - from letta.services.llm_trace_writer import get_llm_trace_writer - - writer = get_llm_trace_writer() - await writer.shutdown_async() - logger.info(f"[Worker {worker_id}] LLM raw trace writer shutdown completed") - except Exception as e: - logger.warning(f"[Worker {worker_id}] LLM raw trace writer shutdown failed: {e}") - logger.info(f"[Worker {worker_id}] Lifespan shutdown completed") @@ -715,6 +724,8 @@ def create_application() -> "FastAPI": # /api/auth endpoints app.include_router(setup_auth_router(server, interface, random_password), prefix=API_PREFIX) + # Git smart HTTP is served by a dulwich sidecar and proxied by the /v1/git router. + # / static files mount_static_files(app) diff --git a/letta/server/rest_api/routers/v1/__init__.py b/letta/server/rest_api/routers/v1/__init__.py index c75f715a..f7293eb2 100644 --- a/letta/server/rest_api/routers/v1/__init__.py +++ b/letta/server/rest_api/routers/v1/__init__.py @@ -6,6 +6,7 @@ from letta.server.rest_api.routers.v1.chat_completions import router as chat_com from letta.server.rest_api.routers.v1.conversations import router as conversations_router from letta.server.rest_api.routers.v1.embeddings import router as embeddings_router from letta.server.rest_api.routers.v1.folders import router as folders_router +from letta.server.rest_api.routers.v1.git_http import router as git_http_router from letta.server.rest_api.routers.v1.groups import router as groups_router from letta.server.rest_api.routers.v1.health import router as health_router from letta.server.rest_api.routers.v1.identities import router as identities_router @@ -39,6 +40,7 @@ ROUTERS = [ agents_router, conversations_router, chat_completions_router, + git_http_router, groups_router, identities_router, internal_agents_router, diff --git a/letta/server/rest_api/routers/v1/git_http.py b/letta/server/rest_api/routers/v1/git_http.py new file mode 100644 index 00000000..c2d33266 --- /dev/null +++ b/letta/server/rest_api/routers/v1/git_http.py @@ -0,0 +1,506 @@ +"""Git HTTP Smart Protocol endpoints via dulwich (proxied). + +## Why a separate dulwich server? + +Dulwich's `HTTPGitApplication` is a WSGI app and relies on the WSGI `write()` +callback pattern. Starlette's `WSGIMiddleware` does not fully support this +pattern, which causes failures when mounting dulwich directly into FastAPI. + +To avoid the ASGI/WSGI impedance mismatch, we run dulwich's WSGI server on a +separate local port (default: 8284) and proxy `/v1/git/*` requests to it. + +Example: + + git clone http://localhost:8283/v1/git/{agent_id}/state.git + +Routes (smart HTTP): + GET /v1/git/{agent_id}/state.git/info/refs?service=git-upload-pack + POST /v1/git/{agent_id}/state.git/git-upload-pack + GET /v1/git/{agent_id}/state.git/info/refs?service=git-receive-pack + POST /v1/git/{agent_id}/state.git/git-receive-pack + +The dulwich server uses `GCSBackend` to materialize repositories from GCS on +-demand. + +Post-push sync back to GCS/PostgreSQL is triggered from the proxy route after a +successful `git-receive-pack`. +""" + +from __future__ import annotations + +import asyncio +import os +import shutil +import tempfile +import threading +from typing import Dict, Iterable, Optional + +import httpx + +# dulwich is an optional dependency (extra = "git-state"). CI installs don't +# include it, so imports must be lazy/guarded. +try: + from dulwich.repo import Repo + from dulwich.server import Backend + from dulwich.web import HTTPGitApplication, make_server + + _DULWICH_AVAILABLE = True +except ImportError: # pragma: no cover + Repo = None # type: ignore[assignment] + + class Backend: # type: ignore[no-redef] + pass + + HTTPGitApplication = None # type: ignore[assignment] + make_server = None # type: ignore[assignment] + _DULWICH_AVAILABLE = False + +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse, StreamingResponse +from starlette.background import BackgroundTask + +from letta.log import get_logger + +logger = get_logger(__name__) + +# Routes are proxied to dulwich running on a separate port. +router = APIRouter(prefix="/git", tags=["git"], include_in_schema=False) + +# Global storage for the server instance (set during app startup) +_server_instance = None + +# org_id/agent_id -> temp working tree path (repo root, with .git inside) +_repo_cache: Dict[str, str] = {} +_repo_locks: Dict[str, threading.Lock] = {} + +# Dulwich server globals +_dulwich_server = None +_dulwich_thread: Optional[threading.Thread] = None + + +def set_server_instance(server) -> None: + """Set the Letta server instance for git operations. Called during app startup.""" + + global _server_instance + _server_instance = server + + +def _get_dulwich_port() -> int: + return int(os.getenv("LETTA_GIT_HTTP_DULWICH_PORT", "8284")) + + +def start_dulwich_server(host: str = "127.0.0.1", port: Optional[int] = None) -> None: + """Start a local dulwich HTTP server in a background thread. + + This is safe to call multiple times; only the first successful call will + start a server in the current process. + """ + + global _dulwich_server, _dulwich_thread + + if not _DULWICH_AVAILABLE: + logger.info("dulwich not installed; git smart HTTP is disabled") + return + + if _dulwich_thread and _dulwich_thread.is_alive(): + return + + if port is None: + port = _get_dulwich_port() + + # Ensure backend can access storage through the running server. + if _server_instance is None: + raise RuntimeError("Server instance not set (did you call set_server_instance?)") + + try: + _dulwich_server = make_server(host, port, _git_wsgi_app) + except OSError as e: + # When running with multiple uvicorn workers, only one process can bind + # to the configured port. + logger.warning("Failed to bind dulwich git server on %s:%s: %s", host, port, e) + return + + def _run(): + logger.info("Starting dulwich git HTTP server on http://%s:%s", host, port) + try: + _dulwich_server.serve_forever() + except Exception: + logger.exception("Dulwich git HTTP server crashed") + + _dulwich_thread = threading.Thread(target=_run, name="dulwich-git-http", daemon=True) + _dulwich_thread.start() + + +def stop_dulwich_server() -> None: + """Stop the local dulwich server (best-effort).""" + + global _dulwich_server + if _dulwich_server is None: + return + try: + _dulwich_server.shutdown() + except Exception: + logger.exception("Failed to shutdown dulwich server") + + +def _default_org_id() -> str: + if _server_instance is None: + raise RuntimeError("Server instance not set") + default_user = getattr(_server_instance, "default_user", None) + org_id = getattr(default_user, "organization_id", None) + if not org_id: + raise RuntimeError("Unable to infer org_id for git HTTP path") + return org_id + + +class GCSBackend(Backend): + """Dulwich backend that materializes repos from GCS.""" + + def open_repository(self, path: str | bytes): + """Open a repository by path. + + dulwich passes paths like: + /{agent_id}/state.git + /{agent_id}/state.git/info/refs + /{agent_id}/state.git/git-upload-pack + /{agent_id}/state.git/git-receive-pack + + We map those to an on-disk repo cached in a temp dir. + """ + + if not _DULWICH_AVAILABLE or Repo is None: + raise RuntimeError("dulwich not installed") + + if isinstance(path, (bytes, bytearray)): + path = path.decode("utf-8", errors="surrogateescape") + + parts = path.strip("/").split("/") + + # Supported path form: /{agent_id}/state.git[/...] + if "state.git" not in parts: + raise ValueError(f"Invalid repository path (missing state.git): {path}") + + repo_idx = parts.index("state.git") + if repo_idx != 1: + raise ValueError(f"Invalid repository path (expected /{{agent_id}}/state.git): {path}") + + agent_id = parts[0] + org_id = _default_org_id() + + cache_key = f"{org_id}/{agent_id}" + logger.info("GCSBackend.open_repository: org=%s agent=%s", org_id, agent_id) + + lock = _repo_locks.setdefault(cache_key, threading.Lock()) + with lock: + # Always refresh from GCS to avoid serving stale refs/objects when the + # repo is mutated through non-git code paths (e.g. git-state APIs) + # or when multiple app workers are running. + old_repo_path = _repo_cache.pop(cache_key, None) + if old_repo_path: + shutil.rmtree(os.path.dirname(old_repo_path), ignore_errors=True) + + repo_path = self._download_repo_sync(agent_id=agent_id, org_id=org_id) + _repo_cache[cache_key] = repo_path + + repo = Repo(repo_path) + _prune_broken_refs(repo) + return repo + + def _download_repo_sync(self, agent_id: str, org_id: str) -> str: + """Synchronously download a repo from GCS. + + dulwich runs in a background thread (wsgiref server thread), so we should + not assume we're on the main event loop. + """ + + if _server_instance is None: + raise RuntimeError("Server instance not set (did you call set_server_instance?)") + + # This runs in a dulwich-managed WSGI thread, not an AnyIO worker thread. + # Use a dedicated event loop to run the async download. + return asyncio.run(self._download_repo(agent_id, org_id)) + + async def _download_repo(self, agent_id: str, org_id: str) -> str: + """Download repo from GCS into a temporary working tree.""" + + storage = _server_instance.memory_repo_manager.git.storage + storage_prefix = f"{org_id}/{agent_id}/repo.git" + + files = await storage.list_files(storage_prefix) + if not files: + # Create an empty repo on-demand so clients can `git clone` immediately. + logger.info("Repository not found for agent %s; creating empty repo", agent_id) + await _server_instance.memory_repo_manager.git.create_repo( + agent_id=agent_id, + org_id=org_id, + initial_files={}, + author_name="Letta System", + author_email="system@letta.ai", + ) + files = await storage.list_files(storage_prefix) + if not files: + raise FileNotFoundError(f"Repository not found for agent {agent_id}") + + temp_dir = tempfile.mkdtemp(prefix="letta-git-http-") + repo_path = os.path.join(temp_dir, "repo") + git_dir = os.path.join(repo_path, ".git") + os.makedirs(git_dir) + + # Ensure required git directories exist for fetch/push even if GCS doesn't + # have any objects packed yet. + for subdir in [ + "objects", + os.path.join("objects", "pack"), + os.path.join("objects", "info"), + "refs", + os.path.join("refs", "heads"), + os.path.join("refs", "tags"), + "info", + ]: + os.makedirs(os.path.join(git_dir, subdir), exist_ok=True) + + async def download_file(file_path: str): + if file_path.startswith(storage_prefix): + rel_path = file_path[len(storage_prefix) + 1 :] + else: + rel_path = file_path.split("/")[-1] + + if not rel_path: + return + + local_path = os.path.join(git_dir, rel_path) + os.makedirs(os.path.dirname(local_path), exist_ok=True) + content = await storage.download_bytes(file_path) + with open(local_path, "wb") as f: + f.write(content) + + await asyncio.gather(*[download_file(f) for f in files]) + logger.info("Downloaded %s files from GCS for agent %s", len(files), agent_id) + + return repo_path + + +def _prune_broken_refs(repo: Repo) -> int: + """Remove refs that point at missing objects. + + This can happen if a prior push partially failed after updating refs but + before all objects were persisted to backing storage. + + We prune these so dulwich doesn't advertise/resolve against corrupt refs, + which can lead to `UnresolvedDeltas` during subsequent pushes. + """ + + removed = 0 + try: + ref_names = list(repo.refs.keys()) + except Exception: + logger.exception("Failed to enumerate refs for pruning") + return 0 + + for name in ref_names: + # HEAD is commonly symbolic; skip. + if name in {b"HEAD", "HEAD"}: + continue + try: + sha = repo.refs[name] + except Exception: + continue + if not sha: + continue + try: + if sha not in repo.object_store: + logger.warning("Pruning broken ref %r -> %r", name, sha) + try: + repo.refs.remove_if_equals(name, sha) + except Exception: + # Best-effort fallback + try: + del repo.refs[name] + except Exception: + pass + removed += 1 + except Exception: + logger.exception("Failed while checking ref %r", name) + + return removed + + +async def _sync_after_push(org_id: str, agent_id: str) -> None: + """Sync repo back to GCS and PostgreSQL after a successful push.""" + + cache_key = f"{org_id}/{agent_id}" + + repo_path = _repo_cache.get(cache_key) + if not repo_path: + logger.warning("No cached repo for %s after push", cache_key) + return + + if not os.path.exists(repo_path): + logger.warning("Repo path %s does not exist after push", repo_path) + return + + logger.info("Syncing repo after push: org=%s agent=%s", org_id, agent_id) + + storage = _server_instance.memory_repo_manager.git.storage + storage_prefix = f"{org_id}/{agent_id}/repo.git" + git_dir = os.path.join(repo_path, ".git") + + upload_tasks = [] + for root, _dirs, files in os.walk(git_dir): + for filename in files: + local_file = os.path.join(root, filename) + rel_path = os.path.relpath(local_file, git_dir) + storage_path = f"{storage_prefix}/{rel_path}" + + with open(local_file, "rb") as f: + content = f.read() + + upload_tasks.append(storage.upload_bytes(storage_path, content)) + + await asyncio.gather(*upload_tasks) + logger.info("Uploaded %s files to GCS", len(upload_tasks)) + + # Sync blocks to Postgres (if using GitEnabledBlockManager) + from letta.services.block_manager_git import GitEnabledBlockManager + + if isinstance(_server_instance.block_manager, GitEnabledBlockManager): + actor = await _server_instance.user_manager.get_actor_or_default_async(actor_id=None) + + repo = Repo(repo_path) + from dulwich.porcelain import reset + + try: + reset(repo, "hard") + except Exception as e: + logger.warning("Failed to reset repo: %s", e) + + blocks_dir = os.path.join(repo_path, "blocks") + if os.path.exists(blocks_dir): + for filename in os.listdir(blocks_dir): + if not filename.endswith(".md"): + continue + + label = filename[:-3] + filepath = os.path.join(blocks_dir, filename) + with open(filepath, "r") as f: + value = f.read() + + await _server_instance.block_manager._sync_block_to_postgres( + agent_id=agent_id, + label=label, + value=value, + actor=actor, + ) + logger.info("Synced block %s to PostgreSQL", label) + + # Cleanup local cache + _repo_cache.pop(cache_key, None) + shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) + + +def _parse_org_agent_from_repo_path(path: str) -> Optional[tuple[str, str]]: + """Extract (org_id, agent_id) from a git HTTP path. + + Expected path form: + - {agent_id}/state.git/... + + org_id is inferred from the running server instance. + """ + + parts = path.strip("/").split("/") + if len(parts) < 2: + return None + + if parts[1] != "state.git": + return None + + return _default_org_id(), parts[0] + + +def _filter_out_hop_by_hop_headers(headers: Iterable[tuple[str, str]]) -> Dict[str, str]: + # RFC 7230 hop-by-hop headers that should not be forwarded + hop_by_hop = { + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailers", + "transfer-encoding", + "upgrade", + } + + out: Dict[str, str] = {} + for k, v in headers: + lk = k.lower() + if lk in hop_by_hop: + continue + out[k] = v + return out + + +@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]) # pragma: no cover +async def proxy_git_http(path: str, request: Request): + """Proxy `/v1/git/*` requests to the local dulwich WSGI server.""" + + if not _DULWICH_AVAILABLE: + return JSONResponse( + status_code=501, + content={ + "detail": "git smart HTTP is disabled (dulwich not installed)", + }, + ) + + # Ensure server is running (best-effort). We also start it during lifespan. + start_dulwich_server() + + port = _get_dulwich_port() + url = f"http://127.0.0.1:{port}/{path}" + + req_headers = _filter_out_hop_by_hop_headers(request.headers.items()) + # Avoid sending FastAPI host/length; httpx will compute + req_headers.pop("host", None) + req_headers.pop("content-length", None) + + async def _body_iter(): + async for chunk in request.stream(): + yield chunk + + client = httpx.AsyncClient(timeout=None) + req = client.build_request( + method=request.method, + url=url, + params=request.query_params, + headers=req_headers, + content=_body_iter() if request.method not in {"GET", "HEAD"} else None, + ) + upstream = await client.send(req, stream=True) + + resp_headers = _filter_out_hop_by_hop_headers(upstream.headers.items()) + + # If this was a push, trigger our sync. + if request.method == "POST" and path.endswith("git-receive-pack") and upstream.status_code < 400: + parsed = _parse_org_agent_from_repo_path(path) + if parsed is not None: + org_id, agent_id = parsed + # Fire-and-forget; do not block git client response. + asyncio.create_task(_sync_after_push(org_id, agent_id)) + + async def _aclose_upstream_and_client() -> None: + try: + await upstream.aclose() + finally: + await client.aclose() + + return StreamingResponse( + upstream.aiter_raw(), + status_code=upstream.status_code, + headers=resp_headers, + media_type=upstream.headers.get("content-type"), + background=BackgroundTask(_aclose_upstream_and_client), + ) + + +# dulwich WSGI app (optional) +_backend = GCSBackend() +_git_wsgi_app = HTTPGitApplication(_backend) if _DULWICH_AVAILABLE and HTTPGitApplication is not None else None diff --git a/letta/server/server.py b/letta/server/server.py index f00a7405..064767cc 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -91,6 +91,7 @@ from letta.services.agent_manager import AgentManager from letta.services.agent_serialization_manager import AgentSerializationManager from letta.services.archive_manager import ArchiveManager from letta.services.block_manager import BlockManager +from letta.services.block_manager_git import GIT_MEMORY_ENABLED_TAG, GitEnabledBlockManager from letta.services.file_manager import FileManager from letta.services.files_agents_manager import FileAgentManager from letta.services.group_manager import GroupManager @@ -104,6 +105,8 @@ from letta.services.mcp.sse_client import MCP_CONFIG_TOPLEVEL_KEY from letta.services.mcp.stdio_client import AsyncStdioMCPClient from letta.services.mcp_manager import MCPManager from letta.services.mcp_server_manager import MCPServerManager +from letta.services.memory_repo import MemoryRepoManager +from letta.services.memory_repo.storage.gcs import GCSStorageBackend from letta.services.message_manager import MessageManager from letta.services.organization_manager import OrganizationManager from letta.services.passage_manager import PassageManager @@ -165,13 +168,19 @@ class SyncServer(object): self.tool_manager = ToolManager() self.mcp_manager = MCPManager() self.mcp_server_manager = MCPServerManager() - self.block_manager = BlockManager() + self.memory_repo_manager = self._init_memory_repo_manager() + # Use git-enabled block manager if memory repo is configured + # It falls back to standard PostgreSQL behavior when git isn't enabled for an agent + if self.memory_repo_manager: + self.block_manager = GitEnabledBlockManager(memory_repo_manager=self.memory_repo_manager) + else: + self.block_manager = BlockManager() self.source_manager = SourceManager() self.sandbox_config_manager = SandboxConfigManager() self.message_manager = MessageManager() self.job_manager = JobManager() self.run_manager = RunManager() - self.agent_manager = AgentManager() + self.agent_manager = AgentManager(block_manager=self.block_manager) self.archive_manager = ArchiveManager() self.provider_manager = ProviderManager() self.step_manager = StepManager() @@ -416,6 +425,55 @@ class SyncServer(object): force_recreate=True, ) + def _init_memory_repo_manager(self) -> Optional[MemoryRepoManager]: + """Initialize the memory repository manager if configured. + + Configure the object store via settings (recommended): + + LETTA_OBJECT_STORE_URI="gs://my-bucket/repository?project=my-gcp-project" + + Supported schemes: + - gs:// (or gcs://) -> Google Cloud Storage + + Returns: + MemoryRepoManager if configured, None otherwise + """ + + # Keep import local to avoid import/circular issues during server bootstrap. + from urllib.parse import parse_qs, urlparse + + from letta.settings import settings + + uri = settings.object_store_uri + if not uri: + logger.debug("Memory repo manager not configured (object_store_uri not set)") + return None + + try: + parsed = urlparse(uri) + scheme = (parsed.scheme or "").lower() + + if scheme in {"gs", "gcs"}: + bucket = parsed.netloc + if not bucket: + raise ValueError(f"Invalid GCS object store URI (missing bucket): {uri}") + + # URI path is treated as the storage prefix + prefix = parsed.path.lstrip("/") or "repository" + qs = parse_qs(parsed.query) + + # Allow settings-level overrides (handy for templated URIs). + project = settings.object_store_project or (qs.get("project") or [None])[0] + + storage = GCSStorageBackend(bucket=bucket, prefix=prefix, project=project) + logger.info("Memory repo manager initialized with object store: %s", uri) + return MemoryRepoManager(storage=storage) + + raise ValueError(f"Unsupported object store scheme '{scheme}' in URI: {uri}") + except Exception as e: + logger.warning(f"Failed to initialize memory repo manager: {e}") + return None + def _get_enabled_provider(self, provider_name: str) -> Optional[Provider]: """Find and return an enabled provider by name. @@ -571,13 +629,31 @@ class SyncServer(object): request.embedding_config = await self.get_embedding_config_from_handle_async(actor=actor, **embedding_config_params) log_event(name="end get_embedding_config_from_handle", attributes=embedding_config_params) + # If git-backed memory is requested on create, we enable it *after* agent creation. + # We strip the tag during creation so `enable_git_memory_for_agent` can be the + # single place that both creates the repo and writes the tag. + wants_git_memory = bool(request.tags and GIT_MEMORY_ENABLED_TAG in request.tags) + create_request = request + if wants_git_memory: + filtered_tags = [t for t in (request.tags or []) if t != GIT_MEMORY_ENABLED_TAG] + create_request = request.model_copy(update={"tags": filtered_tags}) + log_event(name="start create_agent db") main_agent = await self.agent_manager.create_agent_async( - agent_create=request, + agent_create=create_request, actor=actor, ) log_event(name="end create_agent db") + # Enable git-backed memory (creates repo + commits initial blocks + adds tag) + if wants_git_memory and isinstance(self.block_manager, GitEnabledBlockManager): + await self.block_manager.enable_git_memory_for_agent(agent_id=main_agent.id, actor=actor) + # Preserve the user's requested tags in the response model. + try: + main_agent.tags = list(request.tags or []) + except Exception: + pass + log_event(name="start insert_files_into_context_window db") # Use folder_ids if provided, otherwise fall back to deprecated source_ids for backwards compatibility folder_ids_to_attach = request.folder_ids if request.folder_ids else request.source_ids @@ -650,12 +726,26 @@ class SyncServer(object): else: await self.create_sleeptime_agent_async(main_agent=agent, actor=actor) - return await self.agent_manager.update_agent_async( + # If git-backed memory is requested via tag update, initialize/backfill the repo. + wants_git_memory = bool(request.tags and GIT_MEMORY_ENABLED_TAG in request.tags) + + updated_agent = await self.agent_manager.update_agent_async( agent_id=agent_id, agent_update=request, actor=actor, ) + # Ensure repo exists and initial blocks are committed when the tag is present. + if wants_git_memory and isinstance(self.block_manager, GitEnabledBlockManager): + await self.block_manager.enable_git_memory_for_agent(agent_id=agent_id, actor=actor) + # Preserve the user's requested tags in the response model. + try: + updated_agent.tags = list(request.tags or []) + except Exception: + pass + + return updated_agent + async def create_sleeptime_agent_async(self, main_agent: AgentState, actor: User) -> Optional[AgentState]: if main_agent.embedding_config is None: logger.warning(f"Skipping sleeptime agent creation for agent {main_agent.id}: no embedding config provided") diff --git a/letta/services/agent_manager.py b/letta/services/agent_manager.py index 8a9192dc..61ef77ca 100644 --- a/letta/services/agent_manager.py +++ b/letta/services/agent_manager.py @@ -129,8 +129,8 @@ logger = get_logger(__name__) class AgentManager: """Manager class to handle business logic related to Agents.""" - def __init__(self): - self.block_manager = BlockManager() + def __init__(self, block_manager: Optional[BlockManager] = None): + self.block_manager = block_manager or BlockManager() self.tool_manager = ToolManager() self.source_manager = SourceManager() self.message_manager = MessageManager() @@ -1985,6 +1985,9 @@ class AgentManager: actor: PydanticUser, ) -> PydanticBlock: """Modifies a block attached to an agent by its label.""" + + block_id_for_custom_manager: str | None = None + async with db_registry.async_session() as session: matched_block = None agent = await AgentModel.read_async(db_session=session, identifier=agent_id, actor=actor) @@ -1997,33 +2000,46 @@ class AgentManager: update_data = block_update.model_dump(to_orm=True, exclude_unset=True, exclude_none=True) - # Extract tags from update data (it's not a column on the block table) - new_tags = update_data.pop("tags", None) - # Validate limit constraints before updating validate_block_limit_constraint(update_data, matched_block) - for key, value in update_data.items(): - setattr(matched_block, key, value) - - await matched_block.update_async(session, actor=actor) - - if new_tags is not None: - await BlockManager._replace_block_pivot_rows_async( - session, - BlocksTags.__table__, - matched_block.id, - [{"block_id": matched_block.id, "tag": tag} for tag in new_tags], - ) - - pydantic_block = matched_block.to_pydantic() - if new_tags is not None: - pydantic_block.tags = new_tags + # If a custom block manager is injected (e.g. GitEnabledBlockManager), route + # through it so git-backed memory semantics apply. + if self.block_manager.__class__ is not BlockManager: + block_id_for_custom_manager = matched_block.id else: - tags_result = await session.execute(select(BlocksTags.tag).where(BlocksTags.block_id == matched_block.id)) - pydantic_block.tags = [row[0] for row in tags_result.fetchall()] + # Extract tags from update data (it's not a column on the block table) + new_tags = update_data.pop("tags", None) - return pydantic_block + for key, value in update_data.items(): + setattr(matched_block, key, value) + + await matched_block.update_async(session, actor=actor) + + if new_tags is not None: + await BlockManager._replace_block_pivot_rows_async( + session, + BlocksTags.__table__, + matched_block.id, + [{"block_id": matched_block.id, "tag": tag} for tag in new_tags], + ) + + pydantic_block = matched_block.to_pydantic() + if new_tags is not None: + pydantic_block.tags = new_tags + else: + tags_result = await session.execute(select(BlocksTags.tag).where(BlocksTags.block_id == matched_block.id)) + pydantic_block.tags = [row[0] for row in tags_result.fetchall()] + + return pydantic_block + + # Route through block_manager which handles git integration if enabled + assert block_id_for_custom_manager is not None + return await self.block_manager.update_block_async( + block_id=block_id_for_custom_manager, + block_update=block_update, + actor=actor, + ) @enforce_types @raise_on_invalid_id(param_name="agent_id", expected_prefix=PrimitiveType.AGENT) diff --git a/letta/services/block_manager.py b/letta/services/block_manager.py index 848c4868..abc390ae 100644 --- a/letta/services/block_manager.py +++ b/letta/services/block_manager.py @@ -565,6 +565,29 @@ class BlockManager: return pydantic_blocks + @enforce_types + @trace_method + async def get_blocks_by_agent_async(self, agent_id: str, actor: PydanticUser) -> List[PydanticBlock]: + """Retrieve all blocks attached to a specific agent.""" + async with db_registry.async_session() as session: + query = ( + select(BlockModel) + .join(BlocksAgents, BlockModel.id == BlocksAgents.block_id) + .where( + BlocksAgents.agent_id == agent_id, + BlockModel.organization_id == actor.organization_id, + ) + .options( + noload(BlockModel.agents), + noload(BlockModel.identities), + noload(BlockModel.groups), + noload(BlockModel.tags), + ) + ) + result = await session.execute(query) + blocks = result.scalars().all() + return [block.to_pydantic() for block in blocks] + @enforce_types @raise_on_invalid_id(param_name="block_id", expected_prefix=PrimitiveType.BLOCK) @trace_method diff --git a/letta/services/block_manager_git.py b/letta/services/block_manager_git.py new file mode 100644 index 00000000..82a73082 --- /dev/null +++ b/letta/services/block_manager_git.py @@ -0,0 +1,511 @@ +"""Git-enabled block manager that uses object storage as source of truth. + +When an agent has the GIT_MEMORY_ENABLED_TAG tag, block operations: +1. Write to git (GCS) first - source of truth +2. Update PostgreSQL as cache + +This provides full version history while maintaining fast reads from PostgreSQL. +""" + +import json +import time +from typing import List, Optional + +from letta.log import get_logger +from letta.orm.block import Block as BlockModel +from letta.otel.tracing import trace_method +from letta.schemas.block import Block as PydanticBlock, BlockUpdate, CreateBlock +from letta.schemas.user import User as PydanticUser +from letta.server.db import db_registry +from letta.services.block_manager import BlockManager +from letta.services.memory_repo.manager import MemoryRepoManager +from letta.utils import enforce_types + +logger = get_logger(__name__) + +# Tag that enables git-based memory for an agent +GIT_MEMORY_ENABLED_TAG = "git-memory-enabled" + + +class GitEnabledBlockManager(BlockManager): + """Block manager that uses git as source of truth when enabled for an agent. + + For agents with the GIT_MEMORY_ENABLED_TAG: + - All writes go to git first, then sync to PostgreSQL + - Reads come from PostgreSQL (cache) for performance + - Full version history is maintained in git + + For agents without the tag: + - Behaves exactly like the standard BlockManager + """ + + def __init__(self, memory_repo_manager: Optional[MemoryRepoManager] = None): + """Initialize the git-enabled block manager. + + Args: + memory_repo_manager: The memory repo manager for git operations. + If None, git features are disabled. + """ + super().__init__() + self.memory_repo_manager = memory_repo_manager + + async def _is_git_enabled_for_agent(self, agent_id: str, actor: PydanticUser) -> bool: + """Check if an agent has git-based memory enabled.""" + if self.memory_repo_manager is None: + return False + + # Check if agent has the git-memory-enabled tag + async with db_registry.async_session() as session: + from sqlalchemy import select + + from letta.orm.agents_tags import AgentsTags + + result = await session.execute( + select(AgentsTags).where( + AgentsTags.agent_id == agent_id, + AgentsTags.tag == GIT_MEMORY_ENABLED_TAG, + ) + ) + return result.scalar_one_or_none() is not None + + async def _get_agent_id_for_block(self, block_id: str, actor: PydanticUser) -> Optional[str]: + """Get the agent ID that owns a block.""" + async with db_registry.async_session() as session: + from sqlalchemy import select + + from letta.orm.blocks_agents import BlocksAgents + + result = await session.execute(select(BlocksAgents.agent_id).where(BlocksAgents.block_id == block_id)) + row = result.first() + return row[0] if row else None + + async def _sync_block_to_postgres( + self, + agent_id: str, + label: str, + value: str, + actor: PydanticUser, + description: Optional[str] = None, + limit: Optional[int] = None, + ) -> PydanticBlock: + """Sync a block from git to PostgreSQL cache.""" + async with db_registry.async_session() as session: + from sqlalchemy import select + + from letta.orm.blocks_agents import BlocksAgents + + # Find existing block for this agent+label + result = await session.execute( + select(BlockModel) + .join(BlocksAgents, BlocksAgents.block_id == BlockModel.id) + .where( + BlocksAgents.agent_id == agent_id, + BlockModel.label == label, + BlockModel.organization_id == actor.organization_id, + ) + ) + block = result.scalar_one_or_none() + + if block: + # Update existing block + block.value = value + if description is not None: + block.description = description + if limit is not None: + block.limit = limit + await block.update_async(db_session=session, actor=actor) + else: + # Create new block + block = BlockModel( + label=label, + value=value, + description=description or f"{label} block", + limit=limit or 5000, + organization_id=actor.organization_id, + ) + await block.create_async(db_session=session, actor=actor) + + # Link to agent + from letta.orm.blocks_agents import BlocksAgents + + blocks_agents = BlocksAgents( + agent_id=agent_id, + block_id=block.id, + ) + session.add(blocks_agents) + + return block.to_pydantic() + + async def _delete_block_from_postgres( + self, + agent_id: str, + label: str, + actor: PydanticUser, + ) -> None: + """Delete a block from PostgreSQL cache.""" + async with db_registry.async_session() as session: + from sqlalchemy import delete, select + + from letta.orm.blocks_agents import BlocksAgents + + # Find block for this agent+label + result = await session.execute( + select(BlockModel) + .join(BlocksAgents, BlocksAgents.block_id == BlockModel.id) + .where( + BlocksAgents.agent_id == agent_id, + BlockModel.label == label, + BlockModel.organization_id == actor.organization_id, + ) + ) + block = result.scalar_one_or_none() + + if block: + # Delete from blocks_agents + await session.execute(delete(BlocksAgents).where(BlocksAgents.block_id == block.id)) + # Delete the block + await block.hard_delete_async(db_session=session, actor=actor) + + # ========================================================================= + # Override BlockManager methods to add git integration + # ========================================================================= + + @enforce_types + @trace_method + async def update_block_async( + self, + block_id: str, + block_update: BlockUpdate, + actor: PydanticUser, + ) -> PydanticBlock: + """Update a block. If git-enabled, commits to git first.""" + t_start = time.perf_counter() + logger.info(f"[GIT_PERF] update_block_async START block_id={block_id}") + + # Get agent ID for this block + t0 = time.perf_counter() + agent_id = await self._get_agent_id_for_block(block_id, actor) + logger.info(f"[GIT_PERF] _get_agent_id_for_block took {(time.perf_counter() - t0) * 1000:.2f}ms agent_id={agent_id}") + + # Check if git is enabled for this agent + t0 = time.perf_counter() + git_enabled = agent_id and await self._is_git_enabled_for_agent(agent_id, actor) + logger.info(f"[GIT_PERF] _is_git_enabled_for_agent took {(time.perf_counter() - t0) * 1000:.2f}ms enabled={git_enabled}") + + if git_enabled: + # Get current block to get label + t0 = time.perf_counter() + async with db_registry.async_session() as session: + block = await BlockModel.read_async(db_session=session, identifier=block_id, actor=actor) + label = block.label + logger.info(f"[GIT_PERF] BlockModel.read_async took {(time.perf_counter() - t0) * 1000:.2f}ms label={label}") + + # 1. Commit to git (source of truth) + if block_update.value is not None: + t0 = time.perf_counter() + commit = await self.memory_repo_manager.update_block_async( + agent_id=agent_id, + label=label, + value=block_update.value, + actor=actor, + message=f"Update {label} block", + ) + git_time = (time.perf_counter() - t0) * 1000 + logger.info(f"[GIT_PERF] memory_repo_manager.update_block_async took {git_time:.2f}ms commit={commit.sha[:8]}") + + # 2. Sync to PostgreSQL cache + t0 = time.perf_counter() + result = await self._sync_block_to_postgres( + agent_id=agent_id, + label=label, + value=block_update.value or block.value, + actor=actor, + description=block_update.description, + limit=block_update.limit, + ) + logger.info(f"[GIT_PERF] _sync_block_to_postgres took {(time.perf_counter() - t0) * 1000:.2f}ms") + + # Block tags are not stored in git (today); they remain Postgres-only metadata. + # Preserve legacy behavior by updating tags in Postgres even for git-enabled agents. + if block_update.tags is not None: + async with db_registry.async_session() as session: + from letta.orm.blocks_tags import BlocksTags + + await BlockManager._replace_block_pivot_rows_async( + session, + BlocksTags.__table__, + block_id, + [{"block_id": block_id, "tag": tag, "organization_id": actor.organization_id} for tag in block_update.tags], + ) + result.tags = block_update.tags + else: + async with db_registry.async_session() as session: + from sqlalchemy import select + + from letta.orm.blocks_tags import BlocksTags + + tags_result = await session.execute(select(BlocksTags.tag).where(BlocksTags.block_id == block_id)) + result.tags = [row[0] for row in tags_result.fetchall()] + + total_time = (time.perf_counter() - t_start) * 1000 + logger.info(f"[GIT_PERF] update_block_async TOTAL {total_time:.2f}ms (git-enabled path)") + return result + else: + # Fall back to standard PostgreSQL-only behavior + t0 = time.perf_counter() + result = await super().update_block_async(block_id, block_update, actor) + logger.info(f"[GIT_PERF] super().update_block_async took {(time.perf_counter() - t0) * 1000:.2f}ms") + + total_time = (time.perf_counter() - t_start) * 1000 + logger.info(f"[GIT_PERF] update_block_async TOTAL {total_time:.2f}ms (postgres-only path)") + return result + + @enforce_types + @trace_method + async def create_block_async( + self, + block: CreateBlock, + actor: PydanticUser, + agent_id: Optional[str] = None, + ) -> PydanticBlock: + """Create a block. If git-enabled and agent_id provided, commits to git first.""" + # Check if git is enabled for this agent + if agent_id and await self._is_git_enabled_for_agent(agent_id, actor): + # 1. Commit to git (source of truth) + commit = await self.memory_repo_manager.create_block_async( + agent_id=agent_id, + block=PydanticBlock( + label=block.label, + value=block.value, + description=block.description, + limit=block.limit or 5000, + ), + actor=actor, + message=f"Create {block.label} block", + ) + logger.info(f"Git commit for block create: {commit.sha[:8]}") + + # 2. Sync to PostgreSQL cache + return await self._sync_block_to_postgres( + agent_id=agent_id, + label=block.label, + value=block.value, + actor=actor, + description=block.description, + limit=block.limit, + ) + else: + # Fall back to standard PostgreSQL-only behavior + return await super().create_block_async(block, actor) + + @enforce_types + @trace_method + async def delete_block_async(self, block_id: str, actor: PydanticUser) -> None: + """Delete a block. If git-enabled, commits deletion to git first.""" + # Get agent ID and label for this block + agent_id = await self._get_agent_id_for_block(block_id, actor) + + if agent_id and await self._is_git_enabled_for_agent(agent_id, actor): + # Get block label before deleting + async with db_registry.async_session() as session: + block = await BlockModel.read_async(db_session=session, identifier=block_id, actor=actor) + label = block.label + + # 1. Commit deletion to git (source of truth) + commit = await self.memory_repo_manager.delete_block_async( + agent_id=agent_id, + label=label, + actor=actor, + message=f"Delete {label} block", + ) + logger.info(f"Git commit for block delete: {commit.sha[:8]}") + + # 2. Delete from PostgreSQL cache + await self._delete_block_from_postgres(agent_id, label, actor) + else: + # Fall back to standard PostgreSQL-only behavior + await super().delete_block_async(block_id, actor) + + # ========================================================================= + # Git-specific methods + # ========================================================================= + + @enforce_types + @trace_method + async def enable_git_memory_for_agent( + self, + agent_id: str, + actor: PydanticUser, + ) -> None: + """Enable git-based memory for an agent. + + This: + 1. Adds the GIT_MEMORY_ENABLED_TAG to the agent + 2. Creates a git repo for the agent + 3. Commits current blocks as initial state + """ + if self.memory_repo_manager is None: + raise ValueError("Memory repo manager not configured") + + # If already enabled (tag exists), ensure the repo exists. + # + # This matters because tags can be added via the agent update endpoint. In that + # flow, the tag may be persisted before the git repo is created. We treat the + # tag as the source-of-truth "desired state" and backfill the repo if missing. + if await self._is_git_enabled_for_agent(agent_id, actor): + try: + # Fast check: does the repo exist in backing storage? + await self.memory_repo_manager.git.get_head_sha(agent_id=agent_id, org_id=actor.organization_id) + logger.info(f"Git memory already enabled for agent {agent_id}") + return + except FileNotFoundError: + logger.warning( + "Git memory tag present but repo missing for agent %s; creating repo from current blocks", + agent_id, + ) + blocks = await self.get_blocks_by_agent_async(agent_id, actor) + await self.memory_repo_manager.create_repo_async( + agent_id=agent_id, + actor=actor, + initial_blocks=blocks, + ) + logger.info(f"Backfilled git repo for agent {agent_id} with {len(blocks)} blocks") + return + + # Get current blocks for this agent + blocks = await self.get_blocks_by_agent_async(agent_id, actor) + + # Create git repo with current blocks + await self.memory_repo_manager.create_repo_async( + agent_id=agent_id, + actor=actor, + initial_blocks=blocks, + ) + + # Add the tag + async with db_registry.async_session() as session: + from letta.orm.agents_tags import AgentsTags + + tag = AgentsTags( + agent_id=agent_id, + tag=GIT_MEMORY_ENABLED_TAG, + ) + session.add(tag) + await session.commit() + + logger.info(f"Enabled git memory for agent {agent_id} with {len(blocks)} blocks") + + @enforce_types + @trace_method + async def disable_git_memory_for_agent( + self, + agent_id: str, + actor: PydanticUser, + ) -> None: + """Disable git-based memory for an agent. + + This removes the tag but keeps the git repo for historical reference. + """ + async with db_registry.async_session() as session: + from sqlalchemy import delete + + from letta.orm.agents_tags import AgentsTags + + await session.execute( + delete(AgentsTags).where( + AgentsTags.agent_id == agent_id, + AgentsTags.tag == GIT_MEMORY_ENABLED_TAG, + ) + ) + + logger.info(f"Disabled git memory for agent {agent_id}") + + @enforce_types + @trace_method + async def get_block_at_commit( + self, + agent_id: str, + label: str, + commit_sha: str, + actor: PydanticUser, + ) -> Optional[PydanticBlock]: + """Get a block's value at a specific commit. + + This is a git-only operation that reads from version history. + """ + if self.memory_repo_manager is None: + raise ValueError("Memory repo manager not configured") + + return await self.memory_repo_manager.get_block_async( + agent_id=agent_id, + label=label, + actor=actor, + ref=commit_sha, + ) + + @enforce_types + @trace_method + async def get_block_history( + self, + agent_id: str, + actor: PydanticUser, + label: Optional[str] = None, + limit: int = 50, + ): + """Get commit history for an agent's memory blocks. + + Args: + agent_id: Agent ID + actor: User performing the operation + label: Optional block label to filter by + limit: Maximum commits to return + + Returns: + List of MemoryCommit objects + """ + if self.memory_repo_manager is None: + raise ValueError("Memory repo manager not configured") + + path = f"blocks/{label}.md" if label else None + return await self.memory_repo_manager.get_history_async( + agent_id=agent_id, + actor=actor, + path=path, + limit=limit, + ) + + @enforce_types + @trace_method + async def sync_blocks_from_git( + self, + agent_id: str, + actor: PydanticUser, + ) -> List[PydanticBlock]: + """Sync all blocks from git to PostgreSQL. + + Use this to rebuild the PostgreSQL cache from git source of truth. + """ + if self.memory_repo_manager is None: + raise ValueError("Memory repo manager not configured") + + # Get all blocks from git + git_blocks = await self.memory_repo_manager.get_blocks_async( + agent_id=agent_id, + actor=actor, + ) + + # Sync each to PostgreSQL + synced_blocks = [] + for block in git_blocks: + synced = await self._sync_block_to_postgres( + agent_id=agent_id, + label=block.label, + value=block.value, + actor=actor, + description=block.description, + limit=block.limit, + ) + synced_blocks.append(synced) + + logger.info(f"Synced {len(synced_blocks)} blocks from git for agent {agent_id}") + return synced_blocks diff --git a/letta/services/memory_repo/__init__.py b/letta/services/memory_repo/__init__.py new file mode 100644 index 00000000..0d88b161 --- /dev/null +++ b/letta/services/memory_repo/__init__.py @@ -0,0 +1,11 @@ +"""Git-based memory repository services.""" + +from letta.services.memory_repo.manager import MemoryRepoManager +from letta.services.memory_repo.storage.base import StorageBackend +from letta.services.memory_repo.storage.gcs import GCSStorageBackend + +__all__ = [ + "MemoryRepoManager", + "StorageBackend", + "GCSStorageBackend", +] diff --git a/letta/services/memory_repo/git_operations.py b/letta/services/memory_repo/git_operations.py new file mode 100644 index 00000000..75ce5946 --- /dev/null +++ b/letta/services/memory_repo/git_operations.py @@ -0,0 +1,599 @@ +"""Git operations for memory repositories using dulwich. + +Dulwich is a pure-Python implementation of Git that allows us to +manipulate git repositories without requiring libgit2 or the git CLI. + +This module provides high-level operations for working with git repos +stored in object storage (GCS/S3). +""" + +import asyncio +import io +import os +import shutil +import tempfile +import time +import uuid +from datetime import datetime, timezone +from typing import Dict, List, Optional, Tuple + +from letta.data_sources.redis_client import get_redis_client +from letta.log import get_logger +from letta.schemas.memory_repo import FileChange, MemoryCommit +from letta.services.memory_repo.storage.base import StorageBackend + +logger = get_logger(__name__) + + +class GitOperations: + """High-level git operations for memory repositories. + + This class provides git operations that work with repositories + stored in object storage. It downloads the repo to a temp directory, + performs operations, and uploads the changes back. + + For efficiency with small repos (100s of files), we use a full + checkout model. For larger repos, we could optimize to work with + packfiles directly. + + Requirements: + pip install dulwich + """ + + def __init__(self, storage: StorageBackend): + """Initialize git operations. + + Args: + storage: Storage backend for repo persistence + """ + self.storage = storage + self._dulwich = None + + def _get_dulwich(self): + """Lazily import dulwich.""" + if self._dulwich is None: + try: + import dulwich + import dulwich.objects + import dulwich.porcelain + import dulwich.repo + + self._dulwich = dulwich + except ImportError: + raise ImportError("dulwich is required for git operations. Install with: pip install dulwich") + return self._dulwich + + def _repo_path(self, agent_id: str, org_id: str) -> str: + """Get the storage path for an agent's repo.""" + return f"{org_id}/{agent_id}/repo.git" + + async def create_repo( + self, + agent_id: str, + org_id: str, + initial_files: Optional[Dict[str, str]] = None, + author_name: str = "Letta System", + author_email: str = "system@letta.ai", + ) -> str: + """Create a new git repository for an agent. + + Args: + agent_id: Agent ID + org_id: Organization ID + initial_files: Optional initial files to commit + author_name: Author name for initial commit + author_email: Author email for initial commit + + Returns: + Initial commit SHA + """ + dulwich = self._get_dulwich() + + def _create(): + # Create a temporary directory for the repo + temp_dir = tempfile.mkdtemp(prefix="letta-memrepo-") + try: + repo_path = os.path.join(temp_dir, "repo") + os.makedirs(repo_path) + + # Initialize a new repository + repo = dulwich.repo.Repo.init(repo_path) + + # Use `main` as the default branch (git's modern default). + head_path = os.path.join(repo_path, ".git", "HEAD") + with open(head_path, "wb") as f: + f.write(b"ref: refs/heads/main\n") + + # Add initial files if provided + if initial_files: + for file_path, content in initial_files.items(): + full_path = os.path.join(repo_path, file_path) + os.makedirs(os.path.dirname(full_path), exist_ok=True) + with open(full_path, "w", encoding="utf-8") as f: + f.write(content) + # Stage the file + dulwich.porcelain.add(repo_path, paths=[file_path]) + else: + # Create an empty .letta directory to initialize + letta_dir = os.path.join(repo_path, ".letta") + os.makedirs(letta_dir, exist_ok=True) + config_path = os.path.join(letta_dir, "config.json") + with open(config_path, "w") as f: + f.write('{"version": 1}') + dulwich.porcelain.add(repo_path, paths=[".letta/config.json"]) + + # Create initial commit using porcelain (dulwich 1.0+ API) + commit_sha = dulwich.porcelain.commit( + repo_path, + message=b"Initial commit", + committer=f"{author_name} <{author_email}>".encode(), + author=f"{author_name} <{author_email}>".encode(), + ) + + # Return the repo directory and commit SHA for upload + return repo_path, commit_sha.decode() if isinstance(commit_sha, bytes) else str(commit_sha) + except Exception: + shutil.rmtree(temp_dir, ignore_errors=True) + raise + + repo_path, commit_sha = await asyncio.to_thread(_create) + + try: + # Upload the repo to storage + await self._upload_repo(repo_path, agent_id, org_id) + return commit_sha + finally: + # Clean up temp directory + shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) + + async def _upload_repo(self, local_repo_path: str, agent_id: str, org_id: str) -> None: + """Upload a local repo to storage.""" + t_start = time.perf_counter() + storage_prefix = self._repo_path(agent_id, org_id) + + # Walk through the .git directory and collect all files + git_dir = os.path.join(local_repo_path, ".git") + upload_tasks = [] + total_bytes = 0 + + t0 = time.perf_counter() + for root, dirs, files in os.walk(git_dir): + for filename in files: + local_path = os.path.join(root, filename) + rel_path = os.path.relpath(local_path, git_dir) + storage_path = f"{storage_prefix}/{rel_path}" + + with open(local_path, "rb") as f: + content = f.read() + + total_bytes += len(content) + upload_tasks.append((storage_path, content)) + read_time = (time.perf_counter() - t0) * 1000 + logger.info(f"[GIT_PERF] _upload_repo read files took {read_time:.2f}ms files={len(upload_tasks)}") + + # Upload all files in parallel + t0 = time.perf_counter() + await asyncio.gather(*[self.storage.upload_bytes(path, content) for path, content in upload_tasks]) + upload_time = (time.perf_counter() - t0) * 1000 + + total_time = (time.perf_counter() - t_start) * 1000 + logger.info( + f"[GIT_PERF] _upload_repo TOTAL {total_time:.2f}ms " + f"files={len(upload_tasks)} bytes={total_bytes} " + f"upload_time={upload_time:.2f}ms" + ) + + async def _download_repo(self, agent_id: str, org_id: str) -> str: + """Download a repo from storage to a temp directory. + + Returns: + Path to the temporary repo directory + """ + t_start = time.perf_counter() + storage_prefix = self._repo_path(agent_id, org_id) + + # List all files in the repo + t0 = time.perf_counter() + files = await self.storage.list_files(storage_prefix) + list_time = (time.perf_counter() - t0) * 1000 + logger.info(f"[GIT_PERF] _download_repo storage.list_files took {list_time:.2f}ms files_count={len(files)}") + + if not files: + raise FileNotFoundError(f"No repository found for agent {agent_id}") + + # Create temp directory + t0 = time.perf_counter() + temp_dir = tempfile.mkdtemp(prefix="letta-memrepo-") + repo_path = os.path.join(temp_dir, "repo") + git_dir = os.path.join(repo_path, ".git") + os.makedirs(git_dir) + mkdir_time = (time.perf_counter() - t0) * 1000 + logger.info(f"[GIT_PERF] _download_repo tempdir creation took {mkdir_time:.2f}ms path={temp_dir}") + + # Compute local paths and create directories first + file_info = [] + for file_path in files: + if file_path.startswith(storage_prefix): + rel_path = file_path[len(storage_prefix) + 1 :] + else: + rel_path = file_path.split("/")[-1] if "/" in file_path else file_path + + local_path = os.path.join(git_dir, rel_path) + os.makedirs(os.path.dirname(local_path), exist_ok=True) + file_info.append((file_path, local_path)) + + # Download all files in parallel + t0 = time.perf_counter() + download_tasks = [self.storage.download_bytes(fp) for fp, _ in file_info] + contents = await asyncio.gather(*download_tasks) + download_time = (time.perf_counter() - t0) * 1000 + total_bytes = sum(len(c) for c in contents) + logger.info(f"[GIT_PERF] _download_repo parallel download took {download_time:.2f}ms files={len(files)} bytes={total_bytes}") + + # Write all files to disk + t0 = time.perf_counter() + for (_, local_path), content in zip(file_info, contents): + with open(local_path, "wb") as f: + f.write(content) + write_time = (time.perf_counter() - t0) * 1000 + + total_time = (time.perf_counter() - t_start) * 1000 + logger.info( + f"[GIT_PERF] _download_repo TOTAL {total_time:.2f}ms " + f"files={len(files)} bytes={total_bytes} " + f"download_time={download_time:.2f}ms write_time={write_time:.2f}ms" + ) + + return repo_path + + async def get_files( + self, + agent_id: str, + org_id: str, + ref: str = "HEAD", + ) -> Dict[str, str]: + """Get all files at a specific ref. + + Args: + agent_id: Agent ID + org_id: Organization ID + ref: Git ref (commit SHA, branch name, or 'HEAD') + + Returns: + Dict mapping file paths to content + """ + dulwich = self._get_dulwich() + repo_path = await self._download_repo(agent_id, org_id) + + try: + + def _get_files(): + repo = dulwich.repo.Repo(repo_path) + + # Resolve ref to commit + if ref == "HEAD": + commit_sha = repo.head() + else: + # Try as branch name first + try: + commit_sha = repo.refs[f"refs/heads/{ref}".encode()] + except KeyError: + # Try as commit SHA + commit_sha = ref.encode() if isinstance(ref, str) else ref + + commit = repo[commit_sha] + tree = repo[commit.tree] + + # Walk the tree and get all files + files = {} + self._walk_tree(repo, tree, "", files) + return files + + return await asyncio.to_thread(_get_files) + finally: + shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) + + def _walk_tree(self, repo, tree, prefix: str, files: Dict[str, str]) -> None: + """Recursively walk a git tree and collect files.""" + dulwich = self._get_dulwich() + for entry in tree.items(): + name = entry.path.decode() if isinstance(entry.path, bytes) else entry.path + path = f"{prefix}/{name}" if prefix else name + obj = repo[entry.sha] + + if isinstance(obj, dulwich.objects.Blob): + try: + files[path] = obj.data.decode("utf-8") + except UnicodeDecodeError: + # Skip binary files + pass + elif isinstance(obj, dulwich.objects.Tree): + self._walk_tree(repo, obj, path, files) + + async def commit( + self, + agent_id: str, + org_id: str, + changes: List[FileChange], + message: str, + author_name: str = "Letta Agent", + author_email: str = "agent@letta.ai", + branch: str = "main", + ) -> MemoryCommit: + """Commit changes to the repository. + + Uses a Redis lock to prevent concurrent modifications. + + Args: + agent_id: Agent ID + org_id: Organization ID + changes: List of file changes + message: Commit message + author_name: Author name + author_email: Author email + branch: Branch to commit to + + Returns: + MemoryCommit with commit details + + Raises: + MemoryRepoBusyError: If another operation is in progress + """ + t_start = time.perf_counter() + logger.info(f"[GIT_PERF] GitOperations.commit START agent={agent_id} changes={len(changes)}") + + # Acquire lock to prevent concurrent modifications + t0 = time.perf_counter() + redis_client = await get_redis_client() + lock_token = f"commit:{uuid.uuid4().hex}" + lock = await redis_client.acquire_memory_repo_lock(agent_id, lock_token) + logger.info(f"[GIT_PERF] acquire_memory_repo_lock took {(time.perf_counter() - t0) * 1000:.2f}ms") + + try: + t0 = time.perf_counter() + result = await self._commit_with_lock( + agent_id=agent_id, + org_id=org_id, + changes=changes, + message=message, + author_name=author_name, + author_email=author_email, + branch=branch, + ) + logger.info(f"[GIT_PERF] _commit_with_lock took {(time.perf_counter() - t0) * 1000:.2f}ms") + + total_time = (time.perf_counter() - t_start) * 1000 + logger.info(f"[GIT_PERF] GitOperations.commit TOTAL {total_time:.2f}ms") + return result + finally: + # Release lock + t0 = time.perf_counter() + if lock: + try: + await lock.release() + except Exception as e: + logger.warning(f"Failed to release lock for agent {agent_id}: {e}") + await redis_client.release_memory_repo_lock(agent_id) + logger.info(f"[GIT_PERF] lock release took {(time.perf_counter() - t0) * 1000:.2f}ms") + + async def _commit_with_lock( + self, + agent_id: str, + org_id: str, + changes: List[FileChange], + message: str, + author_name: str = "Letta Agent", + author_email: str = "agent@letta.ai", + branch: str = "main", + ) -> MemoryCommit: + """Internal commit implementation (called while holding lock).""" + t_start = time.perf_counter() + dulwich = self._get_dulwich() + + # Download repo from GCS to temp dir + t0 = time.perf_counter() + repo_path = await self._download_repo(agent_id, org_id) + download_time = (time.perf_counter() - t0) * 1000 + logger.info(f"[GIT_PERF] _commit_with_lock download phase took {download_time:.2f}ms") + + try: + + def _commit(): + t_git_start = time.perf_counter() + repo = dulwich.repo.Repo(repo_path) + + # Checkout the working directory + t0_reset = time.perf_counter() + dulwich.porcelain.reset(repo, "hard") + reset_time = (time.perf_counter() - t0_reset) * 1000 + + # Apply changes + files_changed = [] + additions = 0 + deletions = 0 + apply_time = 0 + + for change in changes: + t0_apply = time.perf_counter() + file_path = change.path.lstrip("/") + full_path = os.path.join(repo_path, file_path) + + if change.change_type == "delete" or change.content is None: + # Delete file + if os.path.exists(full_path): + with open(full_path, "r") as f: + deletions += len(f.read()) + os.remove(full_path) + dulwich.porcelain.remove(repo_path, paths=[file_path]) + else: + # Add or modify file + os.makedirs(os.path.dirname(full_path), exist_ok=True) + + # Calculate additions/deletions + if os.path.exists(full_path): + with open(full_path, "r") as f: + old_content = f.read() + deletions += len(old_content) + additions += len(change.content) + + with open(full_path, "w", encoding="utf-8") as f: + f.write(change.content) + dulwich.porcelain.add(repo_path, paths=[file_path]) + + files_changed.append(file_path) + apply_time += (time.perf_counter() - t0_apply) * 1000 + + # Get parent SHA + try: + parent_sha = repo.head().decode() + except Exception: + parent_sha = None + + # Create commit using porcelain (dulwich 1.0+ API) + t0_commit = time.perf_counter() + commit_sha = dulwich.porcelain.commit( + repo_path, + message=message.encode(), + committer=f"{author_name} <{author_email}>".encode(), + author=f"{author_name} <{author_email}>".encode(), + ) + commit_time = (time.perf_counter() - t0_commit) * 1000 + + sha_str = commit_sha.decode() if isinstance(commit_sha, bytes) else str(commit_sha) + + git_total = (time.perf_counter() - t_git_start) * 1000 + logger.info( + f"[GIT_PERF] _commit git operations: reset={reset_time:.2f}ms " + f"apply_changes={apply_time:.2f}ms commit={commit_time:.2f}ms total={git_total:.2f}ms" + ) + + return MemoryCommit( + sha=sha_str, + parent_sha=parent_sha, + message=message, + author_type="agent" if "agent" in author_email.lower() else "user", + author_id=agent_id, + author_name=author_name, + timestamp=datetime.now(timezone.utc), + files_changed=files_changed, + additions=additions, + deletions=deletions, + ) + + t0 = time.perf_counter() + commit = await asyncio.to_thread(_commit) + git_thread_time = (time.perf_counter() - t0) * 1000 + logger.info(f"[GIT_PERF] _commit_with_lock git thread took {git_thread_time:.2f}ms") + + # Upload the updated repo + t0 = time.perf_counter() + await self._upload_repo(repo_path, agent_id, org_id) + upload_time = (time.perf_counter() - t0) * 1000 + logger.info(f"[GIT_PERF] _commit_with_lock upload phase took {upload_time:.2f}ms") + + total_time = (time.perf_counter() - t_start) * 1000 + logger.info( + f"[GIT_PERF] _commit_with_lock TOTAL {total_time:.2f}ms " + f"(download={download_time:.2f}ms git={git_thread_time:.2f}ms upload={upload_time:.2f}ms)" + ) + + return commit + finally: + t0 = time.perf_counter() + shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) + logger.info(f"[GIT_PERF] cleanup temp dir took {(time.perf_counter() - t0) * 1000:.2f}ms") + + async def get_history( + self, + agent_id: str, + org_id: str, + path: Optional[str] = None, + limit: int = 50, + ) -> List[MemoryCommit]: + """Get commit history. + + Args: + agent_id: Agent ID + org_id: Organization ID + path: Optional file path to filter by + limit: Maximum number of commits to return + + Returns: + List of commits, newest first + """ + dulwich = self._get_dulwich() + repo_path = await self._download_repo(agent_id, org_id) + + try: + + def _get_history(): + repo = dulwich.repo.Repo(repo_path) + commits = [] + + # Walk the commit history + walker = repo.get_walker(max_entries=limit) + + for entry in walker: + commit = entry.commit + sha = commit.id.decode() if isinstance(commit.id, bytes) else str(commit.id) + parent_sha = commit.parents[0].decode() if commit.parents else None + + # Parse author + author_str = commit.author.decode() if isinstance(commit.author, bytes) else commit.author + author_name = author_str.split("<")[0].strip() if "<" in author_str else author_str + + commits.append( + MemoryCommit( + sha=sha, + parent_sha=parent_sha, + message=commit.message.decode() if isinstance(commit.message, bytes) else commit.message, + author_type="system", + author_id="", + author_name=author_name, + timestamp=datetime.fromtimestamp(commit.commit_time, tz=timezone.utc), + files_changed=[], # Would need to compute diff for this + additions=0, + deletions=0, + ) + ) + + return commits + + return await asyncio.to_thread(_get_history) + finally: + shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) + + async def get_head_sha(self, agent_id: str, org_id: str) -> str: + """Get the current HEAD commit SHA. + + Args: + agent_id: Agent ID + org_id: Organization ID + + Returns: + HEAD commit SHA + """ + dulwich = self._get_dulwich() + repo_path = await self._download_repo(agent_id, org_id) + + try: + + def _get_head(): + repo = dulwich.repo.Repo(repo_path) + head = repo.head() + return head.decode() if isinstance(head, bytes) else str(head) + + return await asyncio.to_thread(_get_head) + finally: + shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) + + async def delete_repo(self, agent_id: str, org_id: str) -> None: + """Delete an agent's repository from storage. + + Args: + agent_id: Agent ID + org_id: Organization ID + """ + storage_prefix = self._repo_path(agent_id, org_id) + await self.storage.delete_prefix(storage_prefix) + logger.info(f"Deleted repository for agent {agent_id}") diff --git a/letta/services/memory_repo/storage/__init__.py b/letta/services/memory_repo/storage/__init__.py new file mode 100644 index 00000000..2387b3d3 --- /dev/null +++ b/letta/services/memory_repo/storage/__init__.py @@ -0,0 +1,9 @@ +"""Storage backends for memory repositories.""" + +from letta.services.memory_repo.storage.base import StorageBackend +from letta.services.memory_repo.storage.gcs import GCSStorageBackend + +__all__ = [ + "StorageBackend", + "GCSStorageBackend", +] diff --git a/letta/services/memory_repo/storage/base.py b/letta/services/memory_repo/storage/base.py new file mode 100644 index 00000000..2fd97dc6 --- /dev/null +++ b/letta/services/memory_repo/storage/base.py @@ -0,0 +1,127 @@ +"""Abstract base class for storage backends.""" + +from abc import ABC, abstractmethod +from typing import AsyncIterator, List, Optional + + +class StorageBackend(ABC): + """Abstract storage backend for memory repositories. + + Provides a unified interface for storing git repository objects + in various object storage systems (GCS, S3, local filesystem). + """ + + @property + @abstractmethod + def bucket_name(self) -> str: + """Return the bucket/container name.""" + pass + + @abstractmethod + async def upload_bytes(self, path: str, content: bytes) -> None: + """Upload bytes to the given path. + + Args: + path: Path within the bucket (e.g., "org-123/agent-456/objects/pack/pack-abc.pack") + content: Raw bytes to upload + """ + pass + + @abstractmethod + async def download_bytes(self, path: str) -> bytes: + """Download bytes from the given path. + + Args: + path: Path within the bucket + + Returns: + Raw bytes content + + Raises: + FileNotFoundError: If the path doesn't exist + """ + pass + + @abstractmethod + async def exists(self, path: str) -> bool: + """Check if a path exists. + + Args: + path: Path within the bucket + + Returns: + True if the path exists + """ + pass + + @abstractmethod + async def delete(self, path: str) -> None: + """Delete a file at the given path. + + Args: + path: Path within the bucket + + Raises: + FileNotFoundError: If the path doesn't exist + """ + pass + + @abstractmethod + async def list_files(self, prefix: str) -> List[str]: + """List all files with the given prefix. + + Args: + prefix: Path prefix to filter by + + Returns: + List of full paths matching the prefix + """ + pass + + @abstractmethod + async def delete_prefix(self, prefix: str) -> int: + """Delete all files with the given prefix. + + Args: + prefix: Path prefix to delete + + Returns: + Number of files deleted + """ + pass + + async def upload_text(self, path: str, content: str, encoding: str = "utf-8") -> None: + """Upload text content to the given path. + + Args: + path: Path within the bucket + content: Text content to upload + encoding: Text encoding (default: utf-8) + """ + await self.upload_bytes(path, content.encode(encoding)) + + async def download_text(self, path: str, encoding: str = "utf-8") -> str: + """Download text content from the given path. + + Args: + path: Path within the bucket + encoding: Text encoding (default: utf-8) + + Returns: + Text content + """ + content = await self.download_bytes(path) + return content.decode(encoding) + + async def copy(self, source_path: str, dest_path: str) -> None: + """Copy a file from source to destination. + + Default implementation downloads and re-uploads. + Subclasses may override with more efficient implementations. + + Args: + source_path: Source path + dest_path: Destination path + """ + content = await self.download_bytes(source_path) + await self.upload_bytes(dest_path, content) diff --git a/letta/settings.py b/letta/settings.py index cb277eec..48633097 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -296,6 +296,24 @@ class Settings(BaseSettings): plugin_register: Optional[str] = None + # Object storage (used for git-backed memory repos) + # + # Prefer configuring a single URI rather than multiple provider-specific env vars. + # Example: + # LETTA_OBJECT_STORE_URI="gs://my-bucket/repository?project=my-gcp-project" + object_store_uri: str | None = Field( + default=None, + validation_alias=AliasChoices("LETTA_OBJECT_STORE_URI"), + description="Object store URI for memory repositories (e.g., gs://bucket/prefix?project=...).", + ) + + # Optional overrides for URI query params. These are primarily useful for deployments + # where you want to keep the URI stable but inject environment-specific settings. + object_store_project: str | None = Field( + default=None, + validation_alias=AliasChoices("LETTA_OBJECT_STORE_PROJECT"), + description="Optional project override for object store clients (e.g., GCS project).", + ) # multi agent settings multi_agent_send_message_max_retries: int = 3 multi_agent_send_message_timeout: int = 20 * 60 diff --git a/pyproject.toml b/pyproject.toml index a6119383..af3c70a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -112,6 +112,12 @@ bedrock = [ "aioboto3>=14.3.0", ] +# ====== Git State (git-backed memory repos) ====== +git-state = [ + "google-cloud-storage>=2.10.0", + "dulwich>=0.22.0", +] + # ====== Development ====== dev = [ "pytest", diff --git a/uv.lock b/uv.lock index 8028a7a9..73a762d1 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11, <3.14" resolution-markers = [ "python_full_version >= '3.13'", @@ -1146,6 +1146,36 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/10/5da547df7a391dcde17f59520a231527b8571e6f46fc8efb02ccb370ab12/docutils-0.22.4-py3-none-any.whl", hash = "sha256:d0013f540772d1420576855455d050a2180186c91c15779301ac2ccb3eeb68de", size = 633196, upload-time = "2025-12-18T19:00:18.077Z" }, ] +[[package]] +name = "dulwich" +version = "1.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "python_full_version < '3.12'" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ee/df/4178b6465e118e6e74fd78774b451953dd53c09fdec18f2c4b3319dd0485/dulwich-1.0.0.tar.gz", hash = "sha256:3d07104735525f22bfec35514ac611cf328c89b7acb059316a4f6e583c8f09bc", size = 1135862, upload-time = "2026-01-17T23:44:16.357Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/66/fa/99a422ac3bca08eab07a537c86dce12b6ce20b72cf5a14bef5cdb122eddf/dulwich-1.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:1213da9832621b07dfaafdb651b74edb8966481475c52be0bff8dee352d75853", size = 1336935, upload-time = "2026-01-17T23:43:38.84Z" }, + { url = "https://files.pythonhosted.org/packages/86/54/1739af06492a4e98b5c96aa3e22d0b58fda282c10849db733ee8c52f423d/dulwich-1.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:e370e3cdd0b00c059ebee8371cc1644aa61d6de3de0ca5c2f2a5f075bf4c53d9", size = 1400229, upload-time = "2026-01-17T23:43:41.03Z" }, + { url = "https://files.pythonhosted.org/packages/0c/76/efde5050ae9422cf418bee98d3d35dc99935fb076679100e558491e691c9/dulwich-1.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:86271e17d76a667abb1d68dad83b6324422a1ab20d60be30395fd60a37b735b1", size = 1428812, upload-time = "2026-01-17T23:43:43.412Z" }, + { url = "https://files.pythonhosted.org/packages/02/82/f166b206db70db11fb222abeb661b2879ea10f32ad86c85949e5a4fba26a/dulwich-1.0.0-cp311-cp311-win32.whl", hash = "sha256:3051007bc2792b5a72fee938842cf45b66924d6d5147d824f3e609eb75fc0322", size = 985517, upload-time = "2026-01-17T23:43:45.409Z" }, + { url = "https://files.pythonhosted.org/packages/e7/b7/3f8c0059fc8a0eba22e8bb9cec7e2b4e514bc75ede83a320570c5de17599/dulwich-1.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:5cf6e9b5620a3e842663b58ad534da29944db6a6016ba61fc9bbed24830cd85f", size = 1001981, upload-time = "2026-01-17T23:43:47.29Z" }, + { url = "https://files.pythonhosted.org/packages/cc/54/78054a9fd62aa7b1484e97673435cae494cad5d04f020d4571c47e9a2875/dulwich-1.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6736abc2ce2994e38a00a3a4c80237b2b944e7c6f4e346119debdd2592312d83", size = 1316278, upload-time = "2026-01-17T23:43:49.542Z" }, + { url = "https://files.pythonhosted.org/packages/30/20/b2140acf9431c8c862c200cd880b9e5cce8dbe9914324bf238ed92574aea/dulwich-1.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:06514b02da1e32a077062924d2c3b20a7bc76ab9b92eeac691f72b76b14111bc", size = 1393024, upload-time = "2026-01-17T23:43:51.453Z" }, + { url = "https://files.pythonhosted.org/packages/69/71/ad744b9802f222dc364a851bd6130c17809b3472a81a16aefd7d3196f22a/dulwich-1.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:32b6fb1205b1d9c0e43986f9e4e5e50a3670014440e61498eca2b8ab6b00129f", size = 1421022, upload-time = "2026-01-17T23:43:53.053Z" }, + { url = "https://files.pythonhosted.org/packages/21/c0/dfcd795a6b516b9e24aa4339dcc9cdd5ceffe007ad397e5b4938f9793981/dulwich-1.0.0-cp312-cp312-win32.whl", hash = "sha256:1a6583499b915fe5a8ac5595325f1e6a6a5a456de1575e0293e8a6ebb6915f3f", size = 980617, upload-time = "2026-01-17T23:43:54.642Z" }, + { url = "https://files.pythonhosted.org/packages/7c/d4/11075795cc8ab48c771c997fdefef612775ef2582c4710a8fba6ca987500/dulwich-1.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:f90b54faeb995607c876cdb2c082c0f0af702e1ccb524c6126ce99a36536fa3f", size = 998048, upload-time = "2026-01-17T23:43:56.176Z" }, + { url = "https://files.pythonhosted.org/packages/97/82/5ce63c7a2ac8d756bc7477298633e420632eed97ea645ecea13210e9b1a7/dulwich-1.0.0-cp313-cp313-android_21_arm64_v8a.whl", hash = "sha256:ff94f47f0b5787d4e6a0105daf51ff9cdb4e5b9d4e9f8dd01b58ba9a5b79bbd9", size = 1417766, upload-time = "2026-01-17T23:43:57.855Z" }, + { url = "https://files.pythonhosted.org/packages/b9/71/7d4ecdf9e0da21ceec3ac05b03c2cac8cf2271a52172fd55dd65a9faa9e7/dulwich-1.0.0-cp313-cp313-android_21_x86_64.whl", hash = "sha256:1d95663441c930631d9d1765dc4f427dcc0662af45f42a0831357e60055ddb84", size = 1417760, upload-time = "2026-01-17T23:43:59.42Z" }, + { url = "https://files.pythonhosted.org/packages/09/3d/0486cefda75c7e9ea8d8dbdeaa014d618e694bc75734f073927135b37a4b/dulwich-1.0.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:78542a62fabea894943a1d01c9c477a56eee5f7d58d3bdee42c7e0622ddf6893", size = 1316186, upload-time = "2026-01-17T23:44:01.334Z" }, + { url = "https://files.pythonhosted.org/packages/f7/a7/a24c6e1e9f7e5a2ee8f9e362e2c3e5d864cc2b69f04d02bedf82673f31c3/dulwich-1.0.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:d1c33f6456e4335dfe6f4d3917fa7d77050d6470bbbaf8054b5c5084ee8e8cd1", size = 1392530, upload-time = "2026-01-17T23:44:03.655Z" }, + { url = "https://files.pythonhosted.org/packages/d4/03/1ff9dbda655fc714528786e3fdbbe16278bbefc02b9836e91a38620aa616/dulwich-1.0.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:581330cf799577f194fda2b5384b7ba50e095de7ff088779c027a6de63642de2", size = 1420386, upload-time = "2026-01-17T23:44:05.844Z" }, + { url = "https://files.pythonhosted.org/packages/f0/ca/72e7cdde2ee0a4f858166ba8eb81a0d89f61762d9114bd7a358798892fc9/dulwich-1.0.0-cp313-cp313-win32.whl", hash = "sha256:276ff18ae734fe4a1be66d4267216a51d2deab0ac981d722db3d32fcc2ac4ff8", size = 981425, upload-time = "2026-01-17T23:44:07.373Z" }, + { url = "https://files.pythonhosted.org/packages/d7/27/8d4bed76ce983052e259da25255fed85b48ad30a34b4e4b7c8f518fdbc30/dulwich-1.0.0-cp313-cp313-win_amd64.whl", hash = "sha256:cc0ab4ba7fd8617bebe20294dedaa8f713d1767ce059bfbefd971b911b702726", size = 998055, upload-time = "2026-01-17T23:44:08.908Z" }, + { url = "https://files.pythonhosted.org/packages/f9/99/4543953d2f7c1a940c1373362a70d253b85860be64b4ef8885bf8bfb340b/dulwich-1.0.0-py3-none-any.whl", hash = "sha256:221be803b71b060c928e9faae4ab3e259ff5beac6e0c251ba3c176b51b5c2ffb", size = 647950, upload-time = "2026-01-17T23:44:14.449Z" }, +] + [[package]] name = "e2b" version = "2.0.0" @@ -1598,6 +1628,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a8/73/2e03125170485193fcc99ef23b52749543d6c6711706d58713fe315869c4/geventhttpclient-2.3.4-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:5f71c75fc138331cbbe668a08951d36b641d2c26fb3677d7e497afb8419538db", size = 49011, upload-time = "2025-06-11T13:18:05.702Z" }, ] +[[package]] +name = "google-api-core" +version = "2.29.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-auth" }, + { name = "googleapis-common-protos" }, + { name = "proto-plus" }, + { name = "protobuf" }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/0d/10/05572d33273292bac49c2d1785925f7bc3ff2fe50e3044cf1062c1dde32e/google_api_core-2.29.0.tar.gz", hash = "sha256:84181be0f8e6b04006df75ddfe728f24489f0af57c96a529ff7cf45bc28797f7", size = 177828, upload-time = "2026-01-08T22:21:39.269Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/77/b6/85c4d21067220b9a78cfb81f516f9725ea6befc1544ec9bd2c1acd97c324/google_api_core-2.29.0-py3-none-any.whl", hash = "sha256:d30bc60980daa36e314b5d5a3e5958b0200cb44ca8fa1be2b614e932b75a3ea9", size = 173906, upload-time = "2026-01-08T22:21:36.093Z" }, +] + [[package]] name = "google-auth" version = "2.40.3" @@ -1612,6 +1658,61 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/17/63/b19553b658a1692443c62bd07e5868adaa0ad746a0751ba62c59568cd45b/google_auth-2.40.3-py2.py3-none-any.whl", hash = "sha256:1370d4593e86213563547f97a92752fc658456fe4514c809544f330fed45a7ca", size = 216137, upload-time = "2025-06-04T18:04:55.573Z" }, ] +[[package]] +name = "google-cloud-core" +version = "2.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core" }, + { name = "google-auth" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a6/03/ef0bc99d0e0faf4fdbe67ac445e18cdaa74824fd93cd069e7bb6548cb52d/google_cloud_core-2.5.0.tar.gz", hash = "sha256:7c1b7ef5c92311717bd05301aa1a91ffbc565673d3b0b4163a52d8413a186963", size = 36027, upload-time = "2025-10-29T23:17:39.513Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/89/20/bfa472e327c8edee00f04beecc80baeddd2ab33ee0e86fd7654da49d45e9/google_cloud_core-2.5.0-py3-none-any.whl", hash = "sha256:67d977b41ae6c7211ee830c7912e41003ea8194bff15ae7d72fd6f51e57acabc", size = 29469, upload-time = "2025-10-29T23:17:38.548Z" }, +] + +[[package]] +name = "google-cloud-storage" +version = "3.8.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core" }, + { name = "google-auth" }, + { name = "google-cloud-core" }, + { name = "google-crc32c" }, + { name = "google-resumable-media" }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a8/90/4398cecc2704cb066bc7dee6111a5c93c59bcd6fb751f0541315655774a8/google_cloud_storage-3.8.0.tar.gz", hash = "sha256:cc67952dce84ebc9d44970e24647a58260630b7b64d72360cedaf422d6727f28", size = 17273792, upload-time = "2026-01-14T00:45:31.289Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/11/db/326279870d349fb9592263343dca4ad76088c17c88ba97b0f64c1088276c/google_cloud_storage-3.8.0-py3-none-any.whl", hash = "sha256:78cfeae7cac2ca9441d0d0271c2eb4ebfa21aa4c6944dd0ccac0389e81d955a7", size = 312430, upload-time = "2026-01-14T00:45:28.689Z" }, +] + +[[package]] +name = "google-crc32c" +version = "1.8.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/03/41/4b9c02f99e4c5fb477122cd5437403b552873f014616ac1d19ac8221a58d/google_crc32c-1.8.0.tar.gz", hash = "sha256:a428e25fb7691024de47fecfbff7ff957214da51eddded0da0ae0e0f03a2cf79", size = 14192, upload-time = "2025-12-16T00:35:25.142Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5d/ef/21ccfaab3d5078d41efe8612e0ed0bfc9ce22475de074162a91a25f7980d/google_crc32c-1.8.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:014a7e68d623e9a4222d663931febc3033c5c7c9730785727de2a81f87d5bab8", size = 31298, upload-time = "2025-12-16T00:20:32.241Z" }, + { url = "https://files.pythonhosted.org/packages/c5/b8/f8413d3f4b676136e965e764ceedec904fe38ae8de0cdc52a12d8eb1096e/google_crc32c-1.8.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:86cfc00fe45a0ac7359e5214a1704e51a99e757d0272554874f419f79838c5f7", size = 30872, upload-time = "2025-12-16T00:33:58.785Z" }, + { url = "https://files.pythonhosted.org/packages/f6/fd/33aa4ec62b290477181c55bb1c9302c9698c58c0ce9a6ab4874abc8b0d60/google_crc32c-1.8.0-cp311-cp311-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:19b40d637a54cb71e0829179f6cb41835f0fbd9e8eb60552152a8b52c36cbe15", size = 33243, upload-time = "2025-12-16T00:40:21.46Z" }, + { url = "https://files.pythonhosted.org/packages/71/03/4820b3bd99c9653d1a5210cb32f9ba4da9681619b4d35b6a052432df4773/google_crc32c-1.8.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:17446feb05abddc187e5441a45971b8394ea4c1b6efd88ab0af393fd9e0a156a", size = 33608, upload-time = "2025-12-16T00:40:22.204Z" }, + { url = "https://files.pythonhosted.org/packages/7c/43/acf61476a11437bf9733fb2f70599b1ced11ec7ed9ea760fdd9a77d0c619/google_crc32c-1.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:71734788a88f551fbd6a97be9668a0020698e07b2bf5b3aa26a36c10cdfb27b2", size = 34439, upload-time = "2025-12-16T00:35:20.458Z" }, + { url = "https://files.pythonhosted.org/packages/e9/5f/7307325b1198b59324c0fa9807cafb551afb65e831699f2ce211ad5c8240/google_crc32c-1.8.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:4b8286b659c1335172e39563ab0a768b8015e88e08329fa5321f774275fc3113", size = 31300, upload-time = "2025-12-16T00:21:56.723Z" }, + { url = "https://files.pythonhosted.org/packages/21/8e/58c0d5d86e2220e6a37befe7e6a94dd2f6006044b1a33edf1ff6d9f7e319/google_crc32c-1.8.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:2a3dc3318507de089c5384cc74d54318401410f82aa65b2d9cdde9d297aca7cb", size = 30867, upload-time = "2025-12-16T00:38:31.302Z" }, + { url = "https://files.pythonhosted.org/packages/ce/a9/a780cc66f86335a6019f557a8aaca8fbb970728f0efd2430d15ff1beae0e/google_crc32c-1.8.0-cp312-cp312-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:14f87e04d613dfa218d6135e81b78272c3b904e2a7053b841481b38a7d901411", size = 33364, upload-time = "2025-12-16T00:40:22.96Z" }, + { url = "https://files.pythonhosted.org/packages/21/3f/3457ea803db0198c9aaca2dd373750972ce28a26f00544b6b85088811939/google_crc32c-1.8.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cb5c869c2923d56cb0c8e6bcdd73c009c36ae39b652dbe46a05eb4ef0ad01454", size = 33740, upload-time = "2025-12-16T00:40:23.96Z" }, + { url = "https://files.pythonhosted.org/packages/df/c0/87c2073e0c72515bb8733d4eef7b21548e8d189f094b5dad20b0ecaf64f6/google_crc32c-1.8.0-cp312-cp312-win_amd64.whl", hash = "sha256:3cc0c8912038065eafa603b238abf252e204accab2a704c63b9e14837a854962", size = 34437, upload-time = "2025-12-16T00:35:21.395Z" }, + { url = "https://files.pythonhosted.org/packages/d1/db/000f15b41724589b0e7bc24bc7a8967898d8d3bc8caf64c513d91ef1f6c0/google_crc32c-1.8.0-cp313-cp313-macosx_12_0_arm64.whl", hash = "sha256:3ebb04528e83b2634857f43f9bb8ef5b2bbe7f10f140daeb01b58f972d04736b", size = 31297, upload-time = "2025-12-16T00:23:20.709Z" }, + { url = "https://files.pythonhosted.org/packages/d7/0d/8ebed0c39c53a7e838e2a486da8abb0e52de135f1b376ae2f0b160eb4c1a/google_crc32c-1.8.0-cp313-cp313-macosx_12_0_x86_64.whl", hash = "sha256:450dc98429d3e33ed2926fc99ee81001928d63460f8538f21a5d6060912a8e27", size = 30867, upload-time = "2025-12-16T00:43:14.628Z" }, + { url = "https://files.pythonhosted.org/packages/ce/42/b468aec74a0354b34c8cbf748db20d6e350a68a2b0912e128cabee49806c/google_crc32c-1.8.0-cp313-cp313-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:3b9776774b24ba76831609ffbabce8cdf6fa2bd5e9df37b594221c7e333a81fa", size = 33344, upload-time = "2025-12-16T00:40:24.742Z" }, + { url = "https://files.pythonhosted.org/packages/1c/e8/b33784d6fc77fb5062a8a7854e43e1e618b87d5ddf610a88025e4de6226e/google_crc32c-1.8.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:89c17d53d75562edfff86679244830599ee0a48efc216200691de8b02ab6b2b8", size = 33694, upload-time = "2025-12-16T00:40:25.505Z" }, + { url = "https://files.pythonhosted.org/packages/92/b1/d3cbd4d988afb3d8e4db94ca953df429ed6db7282ed0e700d25e6c7bfc8d/google_crc32c-1.8.0-cp313-cp313-win_amd64.whl", hash = "sha256:57a50a9035b75643996fbf224d6661e386c7162d1dfdab9bc4ca790947d1007f", size = 34435, upload-time = "2025-12-16T00:35:22.107Z" }, + { url = "https://files.pythonhosted.org/packages/52/c5/c171e4d8c44fec1422d801a6d2e5d7ddabd733eeda505c79730ee9607f07/google_crc32c-1.8.0-pp311-pypy311_pp73-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:87fa445064e7db928226b2e6f0d5304ab4cd0339e664a4e9a25029f384d9bb93", size = 28615, upload-time = "2025-12-16T00:40:29.298Z" }, + { url = "https://files.pythonhosted.org/packages/9c/97/7d75fe37a7a6ed171a2cf17117177e7aab7e6e0d115858741b41e9dd4254/google_crc32c-1.8.0-pp311-pypy311_pp73-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:f639065ea2042d5c034bf258a9f085eaa7af0cd250667c0635a3118e8f92c69c", size = 28800, upload-time = "2025-12-16T00:40:30.322Z" }, +] + [[package]] name = "google-genai" version = "1.52.0" @@ -1631,6 +1732,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ec/66/03f663e7bca7abe9ccfebe6cb3fe7da9a118fd723a5abb278d6117e7990e/google_genai-1.52.0-py3-none-any.whl", hash = "sha256:c8352b9f065ae14b9322b949c7debab8562982f03bf71d44130cd2b798c20743", size = 261219, upload-time = "2025-11-21T02:18:54.515Z" }, ] +[[package]] +name = "google-resumable-media" +version = "2.8.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-crc32c" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/64/d7/520b62a35b23038ff005e334dba3ffc75fcf583bee26723f1fd8fd4b6919/google_resumable_media-2.8.0.tar.gz", hash = "sha256:f1157ed8b46994d60a1bc432544db62352043113684d4e030ee02e77ebe9a1ae", size = 2163265, upload-time = "2025-11-17T15:38:06.659Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1f/0b/93afde9cfe012260e9fe1522f35c9b72d6ee222f316586b1f23ecf44d518/google_resumable_media-2.8.0-py3-none-any.whl", hash = "sha256:dd14a116af303845a8d932ddae161a26e86cc229645bc98b39f026f9b1717582", size = 81340, upload-time = "2025-11-17T15:38:05.594Z" }, +] + [[package]] name = "googleapis-common-protos" version = "1.70.0" @@ -2631,6 +2744,10 @@ external-tools = [ { name = "turbopuffer" }, { name = "wikipedia" }, ] +git-state = [ + { name = "dulwich" }, + { name = "google-cloud-storage" }, +] modal = [ { name = "modal" }, ] @@ -2687,6 +2804,7 @@ requires-dist = [ { name = "docker", marker = "extra == 'desktop'", specifier = ">=7.1.0" }, { name = "docker", marker = "extra == 'external-tools'", specifier = ">=7.1.0" }, { name = "docstring-parser", specifier = ">=0.16,<0.17" }, + { name = "dulwich", marker = "extra == 'git-state'", specifier = ">=0.22.0" }, { name = "e2b-code-interpreter", marker = "extra == 'cloud-tool-sandbox'", specifier = ">=1.0.3" }, { name = "exa-py", specifier = ">=1.15.4" }, { name = "exa-py", marker = "extra == 'external-tools'", specifier = ">=1.15.4" }, @@ -2694,6 +2812,7 @@ requires-dist = [ { name = "fastapi", marker = "extra == 'desktop'", specifier = ">=0.115.6" }, { name = "fastapi", marker = "extra == 'server'", specifier = ">=0.115.6" }, { name = "fastmcp", specifier = ">=2.12.5" }, + { name = "google-cloud-storage", marker = "extra == 'git-state'", specifier = ">=2.10.0" }, { name = "google-genai", specifier = ">=1.52.0" }, { name = "granian", extras = ["uvloop", "reload"], marker = "extra == 'experimental'", specifier = ">=2.3.2" }, { name = "grpcio", specifier = ">=1.68.1" }, @@ -2780,7 +2899,7 @@ requires-dist = [ { name = "wikipedia", marker = "extra == 'desktop'", specifier = ">=1.4.0" }, { name = "wikipedia", marker = "extra == 'external-tools'", specifier = ">=1.4.0" }, ] -provides-extras = ["postgres", "redis", "pinecone", "sqlite", "experimental", "server", "bedrock", "dev", "cloud-tool-sandbox", "modal", "external-tools", "desktop", "profiling"] +provides-extras = ["postgres", "redis", "pinecone", "sqlite", "experimental", "server", "bedrock", "git-state", "dev", "cloud-tool-sandbox", "modal", "external-tools", "desktop", "profiling"] [[package]] name = "letta-client" @@ -4420,6 +4539,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cc/35/cc0aaecf278bb4575b8555f2b137de5ab821595ddae9da9d3cd1da4072c7/propcache-0.3.2-py3-none-any.whl", hash = "sha256:98f1ec44fb675f5052cccc8e609c46ed23a35a1cfd18545ad4e29002d858a43f", size = 12663, upload-time = "2025-06-09T22:56:04.484Z" }, ] +[[package]] +name = "proto-plus" +version = "1.27.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/01/89/9cbe2f4bba860e149108b683bc2efec21f14d5f7ed6e25562ad86acbc373/proto_plus-1.27.0.tar.gz", hash = "sha256:873af56dd0d7e91836aee871e5799e1c6f1bda86ac9a983e0bb9f0c266a568c4", size = 56158, upload-time = "2025-12-16T13:46:25.729Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cd/24/3b7a0818484df9c28172857af32c2397b6d8fcd99d9468bd4684f98ebf0a/proto_plus-1.27.0-py3-none-any.whl", hash = "sha256:1baa7f81cf0f8acb8bc1f6d085008ba4171eaf669629d1b6d1673b21ed1c0a82", size = 50205, upload-time = "2025-12-16T13:46:24.76Z" }, +] + [[package]] name = "protobuf" version = "5.29.5"