feat: add zai proxy LET-6543 (#6836)

feat: add zai proxy
This commit is contained in:
cthomas
2025-12-12 16:24:48 -08:00
committed by Caren Thomas
parent bce1749408
commit efac48e9ea
5 changed files with 889 additions and 473 deletions

View File

@@ -0,0 +1,518 @@
"""
Shared helper functions for Anthropic-compatible proxy endpoints.
These helpers are used by both the Anthropic and Z.ai proxy routers to reduce code duplication.
"""
import asyncio
import json
from fastapi import Request
from letta.log import get_logger
from letta.server.rest_api.utils import capture_and_persist_messages
from letta.settings import model_settings
logger = get_logger(__name__)
def extract_user_messages(body: bytes) -> list[str]:
"""Extract user messages from request body."""
messages = []
try:
request_data = json.loads(body)
messages = request_data.get("messages", [])
user_messages = []
for msg in messages:
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, str):
user_messages.append(content)
elif isinstance(content, list):
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
user_messages.append(block.get("text", ""))
elif block.get("type") == "image":
user_messages.append("[IMAGE]")
return user_messages
except Exception as e:
logger.warning(f"[Proxy Helpers] Failed to extract user messages from request {messages}: {e}")
return []
def extract_assistant_message(response_data: dict) -> str:
"""Extract assistant message from response data."""
content_blocks = []
try:
content_blocks = response_data.get("content", [])
text_parts = []
for block in content_blocks:
if isinstance(block, dict) and block.get("type") == "text":
text_parts.append(block.get("text", ""))
return "\n".join(text_parts)
except Exception as e:
logger.warning(f"[Proxy Helpers] Failed to extract assistant message from response {content_blocks}: {e}")
return ""
def is_topic_detection_response(message: str) -> bool:
"""
Check if the assistant message is a topic detection response (contains isNewTopic key).
These are Claude Code metadata responses that should not be persisted as conversation.
"""
try:
stripped = message.strip()
if stripped.startswith("{") and stripped.endswith("}"):
parsed = json.loads(stripped)
# Check for isNewTopic key which indicates topic detection
if "isNewTopic" in parsed:
return True
except (json.JSONDecodeError, AttributeError):
pass
return False
def prepare_headers(request: Request, proxy_name: str, use_bearer_auth: bool = False) -> dict | None:
"""
Prepare headers for forwarding to Anthropic-compatible API.
Args:
request: The incoming FastAPI request
proxy_name: Name of the proxy for logging (e.g., "Anthropic Proxy", "Z.ai Proxy")
use_bearer_auth: If True, convert x-api-key to Bearer token in Authorization header (for Z.ai)
Returns:
Dictionary of headers to forward, or None if authentication fails
"""
skip_headers = {
"host",
"connection",
"content-length",
"transfer-encoding",
"content-encoding",
"te",
"upgrade",
"proxy-authenticate",
"proxy-authorization",
"authorization",
}
headers = {}
for key, value in request.headers.items():
if key.lower() not in skip_headers:
headers[key] = value
# Extract API key from headers or fallback to letta's key
api_key = None
if "x-api-key" in headers:
api_key = headers["x-api-key"]
elif "anthropic-api-key" in headers:
api_key = headers["anthropic-api-key"]
else:
# Fallback to letta's anthropic api key if not provided
api_key = model_settings.anthropic_api_key
if api_key:
logger.info(f"[{proxy_name}] Falling back to Letta's anthropic api key instead of user's key")
# Handle authentication based on proxy type
if use_bearer_auth:
# Z.ai: use Bearer token in Authorization header
if api_key:
headers["authorization"] = f"Bearer {api_key}"
# Keep x-api-key in headers too (doesn't hurt)
if "x-api-key" not in headers and api_key:
headers["x-api-key"] = api_key
else:
# Anthropic: use x-api-key header
if api_key and "x-api-key" not in headers:
headers["x-api-key"] = api_key
if "content-type" not in headers:
headers["content-type"] = "application/json"
return headers
def format_memory_blocks(blocks, agent_id: str) -> str:
"""Format memory blocks for injection into system prompt."""
blocks_with_content = [block for block in blocks if block.value]
if not blocks_with_content:
return ""
memory_context = (
"<letta>\n"
"You have persistent memory powered by Letta that is maintained across conversations. "
"A background agent updates these memory blocks based on conversation content.\n"
"<memory_blocks>\n"
"The following memory blocks are currently engaged in your core memory unit:\n\n"
)
for idx, block in enumerate(blocks_with_content):
label = block.label or "block"
value = block.value or ""
desc = block.description or ""
chars_current = len(value)
limit = block.limit if block.limit is not None else 0
memory_context += f"<{label}>\n"
if desc:
memory_context += "<description>\n"
memory_context += f"{desc}\n"
memory_context += "</description>\n"
memory_context += "<metadata>\n"
memory_context += f"- chars_current={chars_current}\n"
memory_context += f"- chars_limit={limit}\n"
memory_context += "</metadata>\n"
memory_context += "<value>\n"
memory_context += f"{value}\n"
memory_context += "</value>\n"
memory_context += f"</{label}>\n"
if idx != len(blocks_with_content) - 1:
memory_context += "\n"
memory_context += "\n</memory_blocks>\n\n"
memory_context += (
"<memory_management>\n"
f"Users can view and edit their memory blocks at:\n"
f"https://app.letta.com/agents/{agent_id}\n\n"
"Share this link when users ask how to manage their memory, what you remember about them, or how to view, edit, or delete stored information.\n"
"</memory_management>\n\n"
"<documentation>\n"
"- Memory blocks: https://docs.letta.com/guides/agents/memory-blocks/index.md\n"
"- Full Letta documentation: https://docs.letta.com/llms.txt\n\n"
"Reference these when users ask how Letta memory works or want to learn more about the platform.\n"
"</documentation>\n"
"</letta>"
)
return memory_context
def build_response_from_chunks(chunks: list[bytes]) -> str:
"""Build complete response text from streaming chunks."""
try:
text_parts = []
full_data = b"".join(chunks).decode("utf-8")
for line in full_data.split("\n"):
if line.startswith("data: "):
data_str = line[6:] # Remove "data: " prefix
if data_str.strip() in ["[DONE]", ""]:
continue
try:
event_data = json.loads(data_str)
event_type = event_data.get("type")
if event_type == "content_block_delta":
delta = event_data.get("delta", {})
if delta.get("type") == "text_delta":
text_parts.append(delta.get("text", ""))
except json.JSONDecodeError:
continue
return "".join(text_parts)
except Exception as e:
logger.warning(f"[Proxy Helpers] Failed to build response from chunks: {e}")
return ""
async def inject_memory_context(
server,
agent,
actor,
request_data: dict,
proxy_name: str,
) -> dict:
"""
Inject memory context into the request system prompt.
Args:
server: SyncServer instance
agent: Agent to get memory from
actor: Actor performing the operation
request_data: Request data dictionary to modify
proxy_name: Name of the proxy for logging (e.g., "Anthropic Proxy", "Z.ai Proxy")
Returns:
Modified request data with memory context injected
"""
try:
messages = request_data.get("messages", [])
if not messages:
return request_data
memory_context = format_memory_blocks(agent.blocks, agent.id)
if not memory_context:
logger.debug(f"[{proxy_name}] No memory blocks found, skipping memory injection")
return request_data
block_count = len([b for b in agent.blocks if b.value])
logger.info(f"[{proxy_name}] Injecting {block_count} memory block(s) into request")
# Inject into system prompt
modified_data = request_data.copy()
# Check if there's already a system prompt
# Anthropic API accepts system as either a string or list of content blocks
existing_system = modified_data.get("system", "")
# Handle both string and list system prompts
if isinstance(existing_system, list):
# If it's a list, prepend our context as a text block
modified_data["system"] = existing_system + [{"type": "text", "text": memory_context.rstrip()}]
elif existing_system:
# If it's a non-empty string, prepend our context
modified_data["system"] = memory_context + existing_system
else:
# No existing system prompt
modified_data["system"] = memory_context.rstrip()
# Fix max_tokens if using extended thinking
# Anthropic requires max_tokens > thinking.budget_tokens
if "thinking" in modified_data and isinstance(modified_data["thinking"], dict):
budget_tokens = modified_data["thinking"].get("budget_tokens", 0)
current_max_tokens = modified_data.get("max_tokens", 0)
if budget_tokens > 0 and current_max_tokens <= budget_tokens:
# Set max_tokens to budget_tokens + reasonable buffer for response
# Claude Code typically uses budget_tokens around 10000-20000
modified_data["max_tokens"] = budget_tokens + 4096
logger.info(
f"[{proxy_name}] Adjusted max_tokens from {current_max_tokens} to {modified_data['max_tokens']} (thinking.budget_tokens={budget_tokens})"
)
return modified_data
except Exception as e:
logger.exception(f"[{proxy_name}] Failed to inject memory context: {e}")
return request_data
async def persist_messages_background(
server,
agent,
actor,
user_messages: list[str],
assistant_message: str,
model_name: str,
proxy_name: str,
):
"""
Background task to persist messages without blocking the response.
This runs asynchronously after the response is returned to minimize latency.
Args:
server: SyncServer instance
agent: Agent to persist messages for
actor: Actor performing the operation
user_messages: List of user messages to persist
assistant_message: Assistant message to persist
model_name: Model name for the messages
proxy_name: Name of the proxy for logging (e.g., "Anthropic Proxy", "Z.ai Proxy")
"""
try:
result = await capture_and_persist_messages(
server=server,
agent=agent,
actor=actor,
user_messages=user_messages,
assistant_message=assistant_message,
model=model_name,
)
if result.get("success"):
logger.info(f"[{proxy_name}] Persisted messages: {result['messages_created']} messages saved")
else:
logger.debug(f"[{proxy_name}] Skipped persistence: {result.get('reason', 'unknown')}")
except Exception as e:
logger.error(f"[{proxy_name}] Failed to persist messages in background: {e}")
async def check_for_duplicate_message(server, agent, actor, user_messages: list[str], proxy_name: str) -> list[str]:
"""
Check if the last user message is a duplicate of the most recent persisted message.
Returns a filtered list with duplicates removed to prevent race conditions.
Args:
server: SyncServer instance
agent: Agent to check messages for
actor: Actor performing the operation
user_messages: List of user messages to check
proxy_name: Name of the proxy for logging
Returns:
Filtered list of user messages (empty if duplicate detected)
"""
user_messages_to_persist = user_messages.copy() if user_messages else []
if user_messages_to_persist:
try:
from letta.schemas.enums import MessageRole
recent_messages = await server.message_manager.list_messages(
agent_id=agent.id,
actor=actor,
limit=5,
roles=[MessageRole.user],
ascending=False,
)
if recent_messages:
last_user_msg = recent_messages[0]
last_message_text = ""
if last_user_msg.content:
for content_block in last_user_msg.content:
if hasattr(content_block, "text"):
last_message_text += content_block.text
incoming_msg = user_messages_to_persist[-1]
if last_message_text and last_message_text == incoming_msg:
logger.info(f"[{proxy_name}] Skipping duplicate user message: {incoming_msg[:100]}...")
user_messages_to_persist = []
except Exception as e:
logger.warning(f"[{proxy_name}] Failed to check for duplicate messages: {e}")
return user_messages_to_persist
async def backfill_agent_project_id(server, agent, actor, project_id: str):
"""
Temporary helper to backfill project_id for legacy agents.
TODO(@caren): Remove this function after all existing Claude Code agents have been backfilled.
Args:
server: SyncServer instance
agent: Agent to update
actor: Actor performing the operation
project_id: Project ID to set
Returns:
Updated agent or original agent if update fails
"""
from letta.schemas.agent import UpdateAgent
try:
updated_agent = await server.update_agent_async(
agent_id=agent.id,
request=UpdateAgent(project_id=project_id),
actor=actor,
)
logger.info(f"[Backfill] Successfully updated agent {agent.id} with project_id {project_id}")
return updated_agent
except Exception as e:
logger.warning(f"[Backfill] Failed to update agent project_id: {e}. Continuing with in-memory update.")
# Fallback: continue with in-memory update
agent.project_id = project_id
return agent
async def get_or_create_claude_code_agent(
server,
actor,
project_id: str = None,
):
"""
Get or create a special agent for Claude Code sessions.
Args:
server: SyncServer instance
actor: Actor performing the operation (user ID)
project_id: Optional project ID to associate the agent with
Returns:
Agent ID
"""
from letta.schemas.agent import CreateAgent
# Create short user identifier from UUID (first 8 chars)
if actor:
user_short_id = str(actor.id)[:8] if hasattr(actor, "id") else str(actor)[:8]
else:
user_short_id = "default"
agent_name = f"claude-code-{user_short_id}"
try:
# Try to find existing agent by name (most reliable)
# Note: Search by name only, not tags, since name is unique and more reliable
logger.debug(f"Searching for agent with name: {agent_name}")
agents = await server.agent_manager.list_agents_async(
actor=actor,
limit=10, # Get a few in case of duplicates
name=agent_name,
include=["agent.blocks", "agent.managed_group", "agent.tags"],
)
# list_agents_async returns a list directly, not an object with .agents
logger.debug(f"Agent search returned {len(agents) if agents else 0} results")
if agents and len(agents) > 0:
# Return the first matching agent
logger.info(f"Found existing Claude Code agent: {agents[0].id} (name: {agent_name})")
agent = agents[0]
# Temporary patch: Fix project_id if it's missing (legacy bug)
# TODO(@caren): Remove this after all existing Claude Code agents have been backfilled
if not agent.project_id and project_id:
logger.info(f"[Backfill] Agent {agent.id} missing project_id, backfilling with {project_id}")
agent = await backfill_agent_project_id(server, agent, actor, project_id)
return agent
else:
logger.debug(f"No existing agent found with name: {agent_name}")
except Exception as e:
logger.warning(f"Could not find existing agent: {e}", exc_info=True)
# Create new agent
try:
logger.info(f"Creating new Claude Code agent: {agent_name} with project_id: {project_id}")
# Create minimal agent config
agent_config = CreateAgent(
name=agent_name,
description="Agent for capturing Claude Code conversations",
memory_blocks=[
{
"label": "human",
"value": "This is my section of core memory devoted to information about the human.\nI don't yet know anything about them.\nWhat's their name? Where are they from? What do they do? Who are they?\nI should update this memory over time as I interact with the human and learn more about them.",
"description": "A memory block for keeping track of the human (user) the agent is interacting with.",
},
{
"label": "persona",
"value": "This is my section of core memory devoted to information myself.\nThere's nothing here yet.\nI should update this memory over time as I develop my personality.",
"description": "A memory block for storing the agent's core personality details and behavior profile.",
},
{
"label": "project",
"value": "This is my section of core memory devoted to information about what the agent is working on.\nI don't yet know anything about it.\nI should update this memory over time with high level understanding and learnings.",
"description": "A memory block for storing the information about the project the agent is working on.",
},
],
tags=["claude-code"],
enable_sleeptime=True,
agent_type="letta_v1_agent",
model="anthropic/claude-sonnet-4-5-20250929",
embedding="openai/text-embedding-ada-002",
project_id=project_id,
)
new_agent = await server.create_agent_async(
request=agent_config,
actor=actor,
)
logger.info(f"Created Claude Code agent {new_agent.name}: {new_agent.id}")
return new_agent
except Exception as e:
logger.exception(f"Failed to create Claude Code agent: {e}")
raise

View File

@@ -26,9 +26,11 @@ from letta.server.rest_api.routers.v1.tags import router as tags_router
from letta.server.rest_api.routers.v1.telemetry import router as telemetry_router
from letta.server.rest_api.routers.v1.tools import router as tools_router
from letta.server.rest_api.routers.v1.voice import router as voice_router
from letta.server.rest_api.routers.v1.zai import router as zai_router
ROUTERS = [
anthropic_router,
zai_router,
archives_router,
tools_router,
sources_router,

View File

@@ -7,282 +7,25 @@ from fastapi.responses import Response, StreamingResponse
from letta.log import get_logger
from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server
from letta.server.rest_api.utils import (
capture_and_persist_messages,
from letta.server.rest_api.proxy_helpers import (
build_response_from_chunks,
check_for_duplicate_message,
extract_assistant_message,
extract_user_messages,
get_or_create_claude_code_agent,
inject_memory_context,
is_topic_detection_response,
persist_messages_background,
prepare_headers,
)
from letta.server.server import SyncServer
from letta.settings import model_settings
logger = get_logger(__name__)
router = APIRouter(prefix="/anthropic", tags=["anthropic"])
ANTHROPIC_API_BASE = "https://api.anthropic.com"
def extract_user_messages(body: bytes) -> list[str]:
messages = []
try:
request_data = json.loads(body)
messages = request_data.get("messages", [])
user_messages = []
for msg in messages:
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, str):
user_messages.append(content)
elif isinstance(content, list):
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
user_messages.append(block.get("text", ""))
elif block.get("type") == "image":
user_messages.append("[IMAGE]")
return user_messages
except Exception as e:
logger.warning(f"[Anthropic Proxy] Failed to extract user messages from request {messages}: {e}")
return []
def extract_assistant_message(response_data: dict) -> str:
content_blocks = []
try:
content_blocks = response_data.get("content", [])
text_parts = []
for block in content_blocks:
if isinstance(block, dict) and block.get("type") == "text":
text_parts.append(block.get("text", ""))
return "\n".join(text_parts)
except Exception as e:
logger.warning(f"[Anthropic Proxy] Failed to extract assistant message from response {content_blocks}: {e}")
return ""
def is_topic_detection_response(message: str) -> bool:
"""
Check if the assistant message is a topic detection response (contains isNewTopic key).
These are Claude Code metadata responses that should not be persisted as conversation.
"""
try:
stripped = message.strip()
if stripped.startswith("{") and stripped.endswith("}"):
parsed = json.loads(stripped)
# Check for isNewTopic key which indicates topic detection
if "isNewTopic" in parsed:
return True
except (json.JSONDecodeError, AttributeError):
pass
return False
def prepare_anthropic_headers(request: Request) -> dict | None:
skip_headers = {
"host",
"connection",
"content-length",
"transfer-encoding",
"content-encoding",
"te",
"upgrade",
"proxy-authenticate",
"proxy-authorization",
"authorization",
}
anthropic_headers = {}
for key, value in request.headers.items():
if key.lower() not in skip_headers:
anthropic_headers[key] = value
# Fallback to letta's anthropic api key if not provided
if "x-api-key" not in anthropic_headers and "anthropic-api-key" not in anthropic_headers:
anthropic_api_key = model_settings.anthropic_api_key
if anthropic_api_key:
logger.info("[Anthropic Proxy] Falling back to Letta's anthropic api key instead of user's key")
anthropic_headers["x-api-key"] = anthropic_api_key
if "content-type" not in anthropic_headers:
anthropic_headers["content-type"] = "application/json"
return anthropic_headers
def format_memory_blocks(blocks, agent_id: str) -> str:
blocks_with_content = [block for block in blocks if block.value]
if not blocks_with_content:
return ""
memory_context = (
"<letta>\n"
"You have persistent memory powered by Letta that is maintained across conversations. "
"A background agent updates these memory blocks based on conversation content.\n"
"<memory_blocks>\n"
"The following memory blocks are currently engaged in your core memory unit:\n\n"
)
for idx, block in enumerate(blocks_with_content):
label = block.label or "block"
value = block.value or ""
desc = block.description or ""
chars_current = len(value)
limit = block.limit if block.limit is not None else 0
memory_context += f"<{label}>\n"
if desc:
memory_context += "<description>\n"
memory_context += f"{desc}\n"
memory_context += "</description>\n"
memory_context += "<metadata>\n"
memory_context += f"- chars_current={chars_current}\n"
memory_context += f"- chars_limit={limit}\n"
memory_context += "</metadata>\n"
memory_context += "<value>\n"
memory_context += f"{value}\n"
memory_context += "</value>\n"
memory_context += f"</{label}>\n"
if idx != len(blocks_with_content) - 1:
memory_context += "\n"
memory_context += "\n</memory_blocks>\n\n"
memory_context += (
"<memory_management>\n"
f"Users can view and edit their memory blocks at:\n"
f"https://app.letta.com/agents/{agent_id}\n\n"
"Share this link when users ask how to manage their memory, what you remember about them, or how to view, edit, or delete stored information.\n"
"</memory_management>\n\n"
"<documentation>\n"
"- Memory blocks: https://docs.letta.com/guides/agents/memory-blocks/index.md\n"
"- Full Letta documentation: https://docs.letta.com/llms.txt\n\n"
"Reference these when users ask how Letta memory works or want to learn more about the platform.\n"
"</documentation>\n"
"</letta>"
)
return memory_context
def _build_response_from_chunks(chunks: list[bytes]) -> str:
try:
text_parts = []
full_data = b"".join(chunks).decode("utf-8")
for line in full_data.split("\n"):
if line.startswith("data: "):
data_str = line[6:] # Remove "data: " prefix
if data_str.strip() in ["[DONE]", ""]:
continue
try:
event_data = json.loads(data_str)
event_type = event_data.get("type")
if event_type == "content_block_delta":
delta = event_data.get("delta", {})
if delta.get("type") == "text_delta":
text_parts.append(delta.get("text", ""))
except json.JSONDecodeError:
continue
return "".join(text_parts)
except Exception as e:
logger.warning(f"[Anthropic Proxy] Failed to build response from chunks: {e}")
return ""
async def _inject_memory_context(
server,
agent,
actor,
request_data: dict,
) -> dict:
try:
messages = request_data.get("messages", [])
if not messages:
return request_data
memory_context = format_memory_blocks(agent.blocks, agent.id)
if not memory_context:
logger.debug("[Anthropic Proxy] No memory blocks found, skipping memory injection")
return request_data
block_count = len([b for b in agent.blocks if b.value])
logger.info(f"[Anthropic Proxy] Injecting {block_count} memory block(s) into request")
# Inject into system prompt
modified_data = request_data.copy()
# Check if there's already a system prompt
# Anthropic API accepts system as either a string or list of content blocks
existing_system = modified_data.get("system", "")
# Handle both string and list system prompts
if isinstance(existing_system, list):
# If it's a list, prepend our context as a text block
modified_data["system"] = existing_system + [{"type": "text", "text": memory_context.rstrip()}]
elif existing_system:
# If it's a non-empty string, prepend our context
modified_data["system"] = memory_context + existing_system
else:
# No existing system prompt
modified_data["system"] = memory_context.rstrip()
# Fix max_tokens if using extended thinking
# Anthropic requires max_tokens > thinking.budget_tokens
if "thinking" in modified_data and isinstance(modified_data["thinking"], dict):
budget_tokens = modified_data["thinking"].get("budget_tokens", 0)
current_max_tokens = modified_data.get("max_tokens", 0)
if budget_tokens > 0 and current_max_tokens <= budget_tokens:
# Set max_tokens to budget_tokens + reasonable buffer for response
# Claude Code typically uses budget_tokens around 10000-20000
modified_data["max_tokens"] = budget_tokens + 4096
logger.info(
f"[Anthropic Proxy] Adjusted max_tokens from {current_max_tokens} to {modified_data['max_tokens']} (thinking.budget_tokens={budget_tokens})"
)
return modified_data
except Exception as e:
logger.exception(f"[Anthropic Proxy] Failed to inject memory context: {e}")
return request_data
async def _persist_messages_background(
server,
agent,
actor,
user_messages: list[str],
assistant_message: str,
model_name: str,
):
"""
Background task to persist messages without blocking the response.
This runs asynchronously after the response is returned to minimize latency.
"""
try:
result = await capture_and_persist_messages(
server=server,
agent=agent,
actor=actor,
user_messages=user_messages,
assistant_message=assistant_message,
model=model_name,
)
if result.get("success"):
logger.info(f"[Anthropic Proxy] Persisted messages: {result['messages_created']} messages saved")
else:
logger.debug(f"[Anthropic Proxy] Skipped persistence: {result.get('reason', 'unknown')}")
except Exception as e:
logger.error(f"[Anthropic Proxy] Failed to persist messages in background: {e}")
PROXY_NAME = "Anthropic Proxy"
@router.api_route("/v1/messages", methods=["POST"], operation_id="anthropic_messages_proxy", include_in_schema=False)
@@ -307,8 +50,8 @@ async def anthropic_messages_proxy(
# Get the request body
body = await request.body()
logger.info(f"[Anthropic Proxy] Proxying request to Anthropic Messages API: {ANTHROPIC_API_BASE}/v1/messages")
logger.debug(f"[Anthropic Proxy] Request body preview: {body[:200]}...")
logger.info(f"[{PROXY_NAME}] Proxying request to Anthropic Messages API: {ANTHROPIC_API_BASE}/v1/messages")
logger.debug(f"[{PROXY_NAME}] Request body preview: {body[:200]}...")
actor = await server.user_manager.get_actor_or_default_async(headers.actor_id)
@@ -322,11 +65,11 @@ async def anthropic_messages_proxy(
# Filter out system/metadata requests
user_messages = [s for s in user_messages if not s.startswith("<system-reminder>")]
if not user_messages:
logger.debug("[Anthropic Proxy] Skipping capture/memory for this turn")
logger.debug(f"[{PROXY_NAME}] Skipping capture/memory for this turn")
anthropic_headers = prepare_anthropic_headers(request)
anthropic_headers = prepare_headers(request, PROXY_NAME)
if not anthropic_headers:
logger.error("[Anthropic Proxy] No Anthropic API key found in headers or settings")
logger.error(f"[{PROXY_NAME}] No Anthropic API key found in headers or settings")
return Response(
content='{"error": {"type": "authentication_error", "message": "Anthropic API key required. Pass via anthropic-api-key or x-api-key header."}}',
status_code=401,
@@ -342,11 +85,11 @@ async def anthropic_messages_proxy(
model_name = request_data.get("model")
# Extract and remove project_id (internal use only, not for Anthropic API)
project_id = request_data.pop("project_id", None)
logger.debug(f"[Anthropic Proxy] Request is streaming: {is_streaming}")
logger.debug(f"[Anthropic Proxy] Model: {model_name}")
logger.debug(f"[Anthropic Proxy] Project ID: {project_id}")
logger.debug(f"[{PROXY_NAME}] Request is streaming: {is_streaming}")
logger.debug(f"[{PROXY_NAME}] Model: {model_name}")
logger.debug(f"[{PROXY_NAME}] Project ID: {project_id}")
except Exception as e:
logger.warning(f"[Anthropic Proxy] Failed to parse request body: {e}")
logger.warning(f"[{PROXY_NAME}] Failed to parse request body: {e}")
is_streaming = False
model_name = None
project_id = None
@@ -361,20 +104,21 @@ async def anthropic_messages_proxy(
actor=actor,
project_id=project_id,
)
logger.debug(f"[Anthropic Proxy] Using agent ID: {agent.id}")
logger.debug(f"[{PROXY_NAME}] Using agent ID: {agent.id}")
except Exception as e:
logger.error(f"[Anthropic Proxy] Failed to get/create agent: {e}")
logger.error(f"[{PROXY_NAME}] Failed to get/create agent: {e}")
# Inject memory context into request (skip for system requests)
# TODO: Optimize - skip memory injection on subsequent messages in same session
# TODO: Add caching layer to avoid duplicate memory searches
modified_body = body
if agent and request_data:
modified_request_data = await _inject_memory_context(
modified_request_data = await inject_memory_context(
server=server,
agent=agent,
actor=actor,
request_data=request_data,
proxy_name=PROXY_NAME,
)
# Re-encode the modified request
import json
@@ -405,7 +149,7 @@ async def anthropic_messages_proxy(
yield chunk
# After streaming is complete, extract and log assistant message
assistant_message = _build_response_from_chunks(collected_chunks)
assistant_message = build_response_from_chunks(collected_chunks)
if user_messages and assistant_message:
logger.info("=" * 70)
logger.info("📨 CAPTURED USER MESSAGE:")
@@ -418,46 +162,22 @@ async def anthropic_messages_proxy(
# Skip persisting topic detection responses (metadata, not conversation)
if is_topic_detection_response(assistant_message):
logger.debug("[Anthropic Proxy] Skipping persistence - topic detection response")
logger.debug(f"[{PROXY_NAME}] Skipping persistence - topic detection response")
# Persist messages to database (non-blocking, skip for system requests)
elif agent:
# Check for duplicate user messages before creating background task
# This prevents race conditions where multiple requests persist the same message
user_messages_to_persist = user_messages.copy() if user_messages else []
if user_messages_to_persist:
try:
from letta.schemas.enums import MessageRole
recent_messages = await server.message_manager.list_messages(
agent_id=agent.id,
actor=actor,
limit=5,
roles=[MessageRole.user],
ascending=False,
)
if recent_messages:
last_user_msg = recent_messages[0]
last_message_text = ""
if last_user_msg.content:
for content_block in last_user_msg.content:
if hasattr(content_block, "text"):
last_message_text += content_block.text
incoming_msg = user_messages_to_persist[-1]
if last_message_text and last_message_text == incoming_msg:
logger.info(f"[Anthropic Proxy] Skipping duplicate user message: {incoming_msg[:100]}...")
user_messages_to_persist = []
except Exception as e:
logger.warning(f"[Anthropic Proxy] Failed to check for duplicate messages: {e}")
user_messages_to_persist = await check_for_duplicate_message(server, agent, actor, user_messages, PROXY_NAME)
asyncio.create_task(
_persist_messages_background(
persist_messages_background(
server=server,
agent=agent,
actor=actor,
user_messages=user_messages_to_persist,
assistant_message=assistant_message,
model_name=model_name,
proxy_name=PROXY_NAME,
)
)
@@ -497,45 +217,25 @@ async def anthropic_messages_proxy(
# Skip persisting topic detection responses (metadata, not conversation)
if is_topic_detection_response(assistant_message):
logger.debug("[Anthropic Proxy] Skipping persistence - topic detection response")
logger.debug(f"[{PROXY_NAME}] Skipping persistence - topic detection response")
# Persist messages to database (non-blocking)
elif agent:
# Check for duplicate user messages before creating background task
user_messages_to_persist = user_messages.copy() if user_messages else []
if user_messages_to_persist:
try:
from letta.schemas.enums import MessageRole
recent_messages = await server.message_manager.list_messages(
agent_id=agent.id, actor=actor, limit=5, roles=[MessageRole.user]
)
if recent_messages:
last_user_msg = recent_messages[0]
last_message_text = ""
if last_user_msg.content:
for content_block in last_user_msg.content:
if hasattr(content_block, "text"):
last_message_text += content_block.text
incoming_msg = user_messages_to_persist[-1]
if last_message_text and last_message_text == incoming_msg:
logger.info(f"[Anthropic Proxy] Skipping duplicate user message: {incoming_msg[:100]}...")
user_messages_to_persist = []
except Exception as e:
logger.warning(f"[Anthropic Proxy] Failed to check for duplicate messages: {e}")
user_messages_to_persist = await check_for_duplicate_message(server, agent, actor, user_messages, PROXY_NAME)
asyncio.create_task(
_persist_messages_background(
persist_messages_background(
server=server,
agent=agent,
actor=actor,
user_messages=user_messages_to_persist,
assistant_message=assistant_message,
model_name=model_name,
proxy_name=PROXY_NAME,
)
)
except Exception as e:
logger.warning(f"[Anthropic Proxy] Failed to extract assistant response for logging: {e}")
logger.warning(f"[{PROXY_NAME}] Failed to extract assistant response for logging: {e}")
return Response(
content=response.content,
@@ -549,7 +249,7 @@ async def anthropic_messages_proxy(
)
except httpx.HTTPError as e:
logger.error(f"[Anthropic Proxy] Error proxying request to Anthropic API: {e}")
logger.error(f"[{PROXY_NAME}] Error proxying request to Anthropic API: {e}")
return Response(
content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Anthropic API: {str(e)}"}}}}',
status_code=500,
@@ -590,11 +290,11 @@ async def anthropic_catchall_proxy(
# Reconstruct the full path
path = f"v1/{endpoint}"
logger.info(f"[Anthropic Proxy] Proxying catch-all request: {request.method} /{path}")
logger.info(f"[{PROXY_NAME}] Proxying catch-all request: {request.method} /{path}")
anthropic_headers = prepare_anthropic_headers(request)
anthropic_headers = prepare_headers(request, PROXY_NAME)
if not anthropic_headers:
logger.error("[Anthropic Proxy] No Anthropic API key found in headers or settings")
logger.error(f"[{PROXY_NAME}] No Anthropic API key found in headers or settings")
return Response(
content='{"error": {"type": "authentication_error", "message": "Anthropic API key required"}}',
status_code=401,
@@ -623,7 +323,7 @@ async def anthropic_catchall_proxy(
)
except httpx.HTTPError as e:
logger.error(f"[Anthropic Proxy] Error proxying catch-all request to Anthropic API: {e}")
logger.error(f"[{PROXY_NAME}] Error proxying catch-all request to Anthropic API: {e}")
return Response(
content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Anthropic API: {str(e)}"}}}}',
status_code=500,

View File

@@ -0,0 +1,331 @@
import asyncio
import json
import httpx
from fastapi import APIRouter, Depends, Request
from fastapi.responses import Response, StreamingResponse
from letta.log import get_logger
from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server
from letta.server.rest_api.proxy_helpers import (
build_response_from_chunks,
check_for_duplicate_message,
extract_assistant_message,
extract_user_messages,
get_or_create_claude_code_agent,
inject_memory_context,
is_topic_detection_response,
persist_messages_background,
prepare_headers,
)
from letta.server.server import SyncServer
logger = get_logger(__name__)
router = APIRouter(prefix="/zai", tags=["zai"])
ZAI_API_BASE = "https://api.z.ai/api/anthropic"
PROXY_NAME = "Z.ai Proxy"
@router.api_route("/v1/messages", methods=["POST"], operation_id="zai_messages_proxy", include_in_schema=False)
async def zai_messages_proxy(
request: Request,
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Proxy endpoint for Z.ai Messages API.
This endpoint forwards requests to the Z.ai API, allowing Claude Code CLI
to use Letta as a proxy by configuring anthropic_base_url.
Usage in Claude Code CLI settings.json:
{
"env": {
"ANTHROPIC_BASE_URL": "http://localhost:3000/v1/zai"
}
}
"""
# Get the request body
body = await request.body()
logger.info(f"[{PROXY_NAME}] Proxying request to Z.ai Messages API: {ZAI_API_BASE}/v1/messages")
logger.debug(f"[{PROXY_NAME}] Request body preview: {body[:200]}...")
actor = await server.user_manager.get_actor_or_default_async(headers.actor_id)
# Extract all user messages from request
all_user_messages = extract_user_messages(body)
# Only capture the LAST user message (the new one the user just sent)
# Claude Code sends full conversation history, but we only want to persist the new message
user_messages = [all_user_messages[-1]] if all_user_messages else []
# Filter out system/metadata requests
user_messages = [s for s in user_messages if not s.startswith("<system-reminder>")]
if not user_messages:
logger.debug(f"[{PROXY_NAME}] Skipping capture/memory for this turn")
zai_headers = prepare_headers(request, PROXY_NAME, use_bearer_auth=True)
if not zai_headers:
logger.error(f"[{PROXY_NAME}] No Anthropic API key found in headers or settings")
return Response(
content='{"error": {"type": "authentication_error", "message": "Anthropic API key required. Pass via anthropic-api-key or x-api-key header."}}',
status_code=401,
media_type="application/json",
)
# Check if this is a streaming request
try:
import json
request_data = json.loads(body)
is_streaming = request_data.get("stream", False)
model_name = request_data.get("model")
# Extract and remove project_id (internal use only, not for Z.ai API)
project_id = request_data.pop("project_id", None)
logger.debug(f"[{PROXY_NAME}] Request is streaming: {is_streaming}")
logger.debug(f"[{PROXY_NAME}] Model: {model_name}")
logger.debug(f"[{PROXY_NAME}] Project ID: {project_id}")
except Exception as e:
logger.warning(f"[{PROXY_NAME}] Failed to parse request body: {e}")
is_streaming = False
model_name = None
project_id = None
# Get or create agent for Claude Code session (skip for system requests)
# Note: Agent lookup and memory search are blocking operations before forwarding.
# Message persistence happens in the background after the response is returned.
agent = None
try:
agent = await get_or_create_claude_code_agent(
server=server,
actor=actor,
project_id=project_id,
)
logger.debug(f"[{PROXY_NAME}] Using agent ID: {agent.id}")
except Exception as e:
logger.error(f"[{PROXY_NAME}] Failed to get/create agent: {e}")
# Inject memory context into request (skip for system requests)
# TODO: Optimize - skip memory injection on subsequent messages in same session
# TODO: Add caching layer to avoid duplicate memory searches
modified_body = body
if agent and request_data:
modified_request_data = await inject_memory_context(
server=server,
agent=agent,
actor=actor,
request_data=request_data,
proxy_name=PROXY_NAME,
)
# Re-encode the modified request
import json
modified_body = json.dumps(modified_request_data).encode("utf-8")
# Forward the request to Z.ai API (preserve query params like ?beta=true)
# Note: For streaming, we create the client outside the generator to keep it alive
zai_url = f"{ZAI_API_BASE}/v1/messages"
if request.url.query:
zai_url = f"{zai_url}?{request.url.query}"
if is_streaming:
# Handle streaming response
collected_chunks = []
async def stream_response():
# Create client inside the generator so it stays alive during streaming
async with httpx.AsyncClient(timeout=300.0) as client:
async with client.stream(
"POST",
zai_url,
headers=zai_headers,
content=modified_body,
) as response:
async for chunk in response.aiter_bytes():
collected_chunks.append(chunk)
yield chunk
# After streaming is complete, extract and log assistant message
assistant_message = build_response_from_chunks(collected_chunks)
if user_messages and assistant_message:
logger.info("=" * 70)
logger.info("📨 CAPTURED USER MESSAGE:")
for i, user_message in enumerate(user_messages):
logger.info(f" {i}: {user_message[:200]}{'...' if len(user_message) > 200 else ''}")
logger.info("=" * 70)
logger.info("🤖 CAPTURED ASSISTANT RESPONSE (streaming):")
logger.info(f" {assistant_message[:200]}{'...' if len(assistant_message) > 200 else ''}")
logger.info("=" * 70)
# Skip persisting topic detection responses (metadata, not conversation)
if is_topic_detection_response(assistant_message):
logger.debug(f"[{PROXY_NAME}] Skipping persistence - topic detection response")
# Persist messages to database (non-blocking, skip for system requests)
elif agent:
# Check for duplicate user messages before creating background task
# This prevents race conditions where multiple requests persist the same message
user_messages_to_persist = await check_for_duplicate_message(server, agent, actor, user_messages, PROXY_NAME)
asyncio.create_task(
persist_messages_background(
server=server,
agent=agent,
actor=actor,
user_messages=user_messages_to_persist,
assistant_message=assistant_message,
model_name=model_name,
proxy_name=PROXY_NAME,
)
)
return StreamingResponse(
stream_response(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
# Non-streaming path
async with httpx.AsyncClient(timeout=300.0) as client:
try:
# Handle non-streaming response
response = await client.post(
zai_url,
headers=zai_headers,
content=modified_body,
)
logger.info(f"Successfully proxied request, status: {response.status_code}")
# Extract and log assistant message
if response.status_code == 200:
try:
import json
response_data = json.loads(response.content)
assistant_message = extract_assistant_message(response_data)
if assistant_message:
logger.info("=" * 70)
logger.info("🤖 CAPTURED ASSISTANT RESPONSE:")
logger.info(f" {assistant_message[:500]}{'...' if len(assistant_message) > 500 else ''}")
logger.info("=" * 70)
# Skip persisting topic detection responses (metadata, not conversation)
if is_topic_detection_response(assistant_message):
logger.debug(f"[{PROXY_NAME}] Skipping persistence - topic detection response")
# Persist messages to database (non-blocking)
elif agent:
# Check for duplicate user messages before creating background task
user_messages_to_persist = await check_for_duplicate_message(server, agent, actor, user_messages, PROXY_NAME)
asyncio.create_task(
persist_messages_background(
server=server,
agent=agent,
actor=actor,
user_messages=user_messages_to_persist,
assistant_message=assistant_message,
model_name=model_name,
proxy_name=PROXY_NAME,
)
)
except Exception as e:
logger.warning(f"[{PROXY_NAME}] Failed to extract assistant response for logging: {e}")
return Response(
content=response.content,
status_code=response.status_code,
media_type=response.headers.get("content-type", "application/json"),
headers={
k: v
for k, v in response.headers.items()
if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
},
)
except httpx.HTTPError as e:
logger.error(f"[{PROXY_NAME}] Error proxying request to Z.ai API: {e}")
return Response(
content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Z.ai API: {str(e)}"}}}}',
status_code=500,
media_type="application/json",
)
@router.api_route(
"/v1/{endpoint:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
operation_id="zai_catchall_proxy",
include_in_schema=False,
)
async def zai_catchall_proxy(
endpoint: str,
request: Request,
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Catch-all proxy for other Z.ai API endpoints.
This forwards all other requests (like /v1/messages/count_tokens) directly to Z.ai
without message capture or memory injection.
"""
# Skip the /v1/messages endpoint (handled by specific route)
if endpoint == "messages" and request.method == "POST":
# This should be handled by the specific route, but just in case return error
return Response(
content='{"error": {"type": "routing_error", "message": "Use specific /v1/messages endpoint"}}',
status_code=500,
media_type="application/json",
)
# Get the request body
body = await request.body()
# Reconstruct the full path
path = f"v1/{endpoint}"
logger.info(f"[{PROXY_NAME}] Proxying catch-all request: {request.method} /{path}")
zai_headers = prepare_headers(request, PROXY_NAME, use_bearer_auth=True)
if not zai_headers:
logger.error(f"[{PROXY_NAME}] No Anthropic API key found in headers or settings")
return Response(
content='{"error": {"type": "authentication_error", "message": "Anthropic API key required"}}',
status_code=401,
media_type="application/json",
)
# Forward the request to Z.ai API
async with httpx.AsyncClient(timeout=300.0) as client:
try:
response = await client.request(
method=request.method,
url=f"{ZAI_API_BASE}/{path}",
headers=zai_headers,
content=body if body else None,
)
return Response(
content=response.content,
status_code=response.status_code,
media_type=response.headers.get("content-type", "application/json"),
headers={
k: v
for k, v in response.headers.items()
if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"]
},
)
except httpx.HTTPError as e:
logger.error(f"[{PROXY_NAME}] Error proxying catch-all request to Z.ai API: {e}")
return Response(
content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Z.ai API: {str(e)}"}}}}',
status_code=500,
media_type="application/json",
)

View File

@@ -858,138 +858,3 @@ async def capture_and_persist_messages(
"messages_created": len(response_messages),
"run_ids": run_ids,
}
async def _backfill_agent_project_id(server, agent, actor, project_id: str):
"""
Temporary helper to backfill project_id for legacy agents.
TODO(@caren): Remove this function after all existing Claude Code agents have been backfilled.
Args:
server: SyncServer instance
agent: Agent to update
actor: Actor performing the operation
project_id: Project ID to set
Returns:
Updated agent or original agent if update fails
"""
from letta.schemas.agent import UpdateAgent
try:
updated_agent = await server.update_agent_async(
agent_id=agent.id,
request=UpdateAgent(project_id=project_id),
actor=actor,
)
logger.info(f"[Backfill] Successfully updated agent {agent.id} with project_id {project_id}")
return updated_agent
except Exception as e:
logger.warning(f"[Backfill] Failed to update agent project_id: {e}. Continuing with in-memory update.")
# Fallback: continue with in-memory update
agent.project_id = project_id
return agent
async def get_or_create_claude_code_agent(
server,
actor,
project_id: Optional[str] = None,
):
"""
Get or create a special agent for Claude Code sessions.
Args:
server: SyncServer instance
actor: Actor performing the operation (user ID)
project_id: Optional project ID to associate the agent with
Returns:
Agent ID
"""
from letta.schemas.agent import CreateAgent
# Create short user identifier from UUID (first 8 chars)
if actor:
user_short_id = str(actor.id)[:8] if hasattr(actor, "id") else str(actor)[:8]
else:
user_short_id = "default"
agent_name = f"claude-code-{user_short_id}"
try:
# Try to find existing agent by name (most reliable)
# Note: Search by name only, not tags, since name is unique and more reliable
logger.debug(f"Searching for agent with name: {agent_name}")
agents = await server.agent_manager.list_agents_async(
actor=actor,
limit=10, # Get a few in case of duplicates
name=agent_name,
include=["agent.blocks", "agent.managed_group", "agent.tags"],
)
# list_agents_async returns a list directly, not an object with .agents
logger.debug(f"Agent search returned {len(agents) if agents else 0} results")
if agents and len(agents) > 0:
# Return the first matching agent
logger.info(f"Found existing Claude Code agent: {agents[0].id} (name: {agent_name})")
agent = agents[0]
# Temporary patch: Fix project_id if it's missing (legacy bug)
# TODO(@caren): Remove this after all existing Claude Code agents have been backfilled
if not agent.project_id and project_id:
logger.info(f"[Backfill] Agent {agent.id} missing project_id, backfilling with {project_id}")
agent = await _backfill_agent_project_id(server, agent, actor, project_id)
return agent
else:
logger.debug(f"No existing agent found with name: {agent_name}")
except Exception as e:
logger.warning(f"Could not find existing agent: {e}", exc_info=True)
# Create new agent
try:
logger.info(f"Creating new Claude Code agent: {agent_name} with project_id: {project_id}")
# Create minimal agent config
agent_config = CreateAgent(
name=agent_name,
description="Agent for capturing Claude Code conversations",
memory_blocks=[
{
"label": "human",
"value": "This is my section of core memory devoted to information about the human.\nI don't yet know anything about them.\nWhat's their name? Where are they from? What do they do? Who are they?\nI should update this memory over time as I interact with the human and learn more about them.",
"description": "A memory block for keeping track of the human (user) the agent is interacting with.",
},
{
"label": "persona",
"value": "This is my section of core memory devoted to information myself.\nThere's nothing here yet.\nI should update this memory over time as I develop my personality.",
"description": "A memory block for storing the agent's core personality details and behavior profile.",
},
{
"label": "project",
"value": "This is my section of core memory devoted to information about what the agent is working on.\nI don't yet know anything about it.\nI should update this memory over time with high level understanding and learnings.",
"description": "A memory block for storing the information about the project the agent is working on.",
},
],
tags=["claude-code"],
enable_sleeptime=True,
agent_type="letta_v1_agent",
model="anthropic/claude-sonnet-4-5-20250929",
embedding="openai/text-embedding-ada-002",
project_id=project_id,
)
new_agent = await server.create_agent_async(
request=agent_config,
actor=actor,
)
logger.info(f"Created Claude Code agent {new_agent.name}: {new_agent.id}")
return new_agent
except Exception as e:
logger.exception(f"Failed to create Claude Code agent: {e}")
raise