From efac48e9ea45336447c8e8f3e6e283d1381dbd23 Mon Sep 17 00:00:00 2001 From: cthomas Date: Fri, 12 Dec 2025 16:24:48 -0800 Subject: [PATCH] feat: add zai proxy LET-6543 (#6836) feat: add zai proxy --- letta/server/rest_api/proxy_helpers.py | 518 ++++++++++++++++++ letta/server/rest_api/routers/v1/__init__.py | 2 + letta/server/rest_api/routers/v1/anthropic.py | 376 ++----------- letta/server/rest_api/routers/v1/zai.py | 331 +++++++++++ letta/server/rest_api/utils.py | 135 ----- 5 files changed, 889 insertions(+), 473 deletions(-) create mode 100644 letta/server/rest_api/proxy_helpers.py create mode 100644 letta/server/rest_api/routers/v1/zai.py diff --git a/letta/server/rest_api/proxy_helpers.py b/letta/server/rest_api/proxy_helpers.py new file mode 100644 index 00000000..8f56cca6 --- /dev/null +++ b/letta/server/rest_api/proxy_helpers.py @@ -0,0 +1,518 @@ +""" +Shared helper functions for Anthropic-compatible proxy endpoints. + +These helpers are used by both the Anthropic and Z.ai proxy routers to reduce code duplication. +""" + +import asyncio +import json + +from fastapi import Request + +from letta.log import get_logger +from letta.server.rest_api.utils import capture_and_persist_messages +from letta.settings import model_settings + +logger = get_logger(__name__) + + +def extract_user_messages(body: bytes) -> list[str]: + """Extract user messages from request body.""" + messages = [] + try: + request_data = json.loads(body) + messages = request_data.get("messages", []) + + user_messages = [] + for msg in messages: + if msg.get("role") == "user": + content = msg.get("content", "") + if isinstance(content, str): + user_messages.append(content) + elif isinstance(content, list): + for block in content: + if isinstance(block, dict): + if block.get("type") == "text": + user_messages.append(block.get("text", "")) + elif block.get("type") == "image": + user_messages.append("[IMAGE]") + + return user_messages + except Exception as e: + logger.warning(f"[Proxy Helpers] Failed to extract user messages from request {messages}: {e}") + return [] + + +def extract_assistant_message(response_data: dict) -> str: + """Extract assistant message from response data.""" + content_blocks = [] + try: + content_blocks = response_data.get("content", []) + text_parts = [] + + for block in content_blocks: + if isinstance(block, dict) and block.get("type") == "text": + text_parts.append(block.get("text", "")) + + return "\n".join(text_parts) + except Exception as e: + logger.warning(f"[Proxy Helpers] Failed to extract assistant message from response {content_blocks}: {e}") + return "" + + +def is_topic_detection_response(message: str) -> bool: + """ + Check if the assistant message is a topic detection response (contains isNewTopic key). + These are Claude Code metadata responses that should not be persisted as conversation. + """ + try: + stripped = message.strip() + if stripped.startswith("{") and stripped.endswith("}"): + parsed = json.loads(stripped) + # Check for isNewTopic key which indicates topic detection + if "isNewTopic" in parsed: + return True + except (json.JSONDecodeError, AttributeError): + pass + return False + + +def prepare_headers(request: Request, proxy_name: str, use_bearer_auth: bool = False) -> dict | None: + """ + Prepare headers for forwarding to Anthropic-compatible API. + + Args: + request: The incoming FastAPI request + proxy_name: Name of the proxy for logging (e.g., "Anthropic Proxy", "Z.ai Proxy") + use_bearer_auth: If True, convert x-api-key to Bearer token in Authorization header (for Z.ai) + + Returns: + Dictionary of headers to forward, or None if authentication fails + """ + skip_headers = { + "host", + "connection", + "content-length", + "transfer-encoding", + "content-encoding", + "te", + "upgrade", + "proxy-authenticate", + "proxy-authorization", + "authorization", + } + + headers = {} + for key, value in request.headers.items(): + if key.lower() not in skip_headers: + headers[key] = value + + # Extract API key from headers or fallback to letta's key + api_key = None + if "x-api-key" in headers: + api_key = headers["x-api-key"] + elif "anthropic-api-key" in headers: + api_key = headers["anthropic-api-key"] + else: + # Fallback to letta's anthropic api key if not provided + api_key = model_settings.anthropic_api_key + if api_key: + logger.info(f"[{proxy_name}] Falling back to Letta's anthropic api key instead of user's key") + + # Handle authentication based on proxy type + if use_bearer_auth: + # Z.ai: use Bearer token in Authorization header + if api_key: + headers["authorization"] = f"Bearer {api_key}" + # Keep x-api-key in headers too (doesn't hurt) + if "x-api-key" not in headers and api_key: + headers["x-api-key"] = api_key + else: + # Anthropic: use x-api-key header + if api_key and "x-api-key" not in headers: + headers["x-api-key"] = api_key + + if "content-type" not in headers: + headers["content-type"] = "application/json" + + return headers + + +def format_memory_blocks(blocks, agent_id: str) -> str: + """Format memory blocks for injection into system prompt.""" + blocks_with_content = [block for block in blocks if block.value] + + if not blocks_with_content: + return "" + + memory_context = ( + "\n" + "You have persistent memory powered by Letta that is maintained across conversations. " + "A background agent updates these memory blocks based on conversation content.\n" + "\n" + "The following memory blocks are currently engaged in your core memory unit:\n\n" + ) + + for idx, block in enumerate(blocks_with_content): + label = block.label or "block" + value = block.value or "" + desc = block.description or "" + chars_current = len(value) + limit = block.limit if block.limit is not None else 0 + + memory_context += f"<{label}>\n" + if desc: + memory_context += "\n" + memory_context += f"{desc}\n" + memory_context += "\n" + memory_context += "\n" + memory_context += f"- chars_current={chars_current}\n" + memory_context += f"- chars_limit={limit}\n" + memory_context += "\n" + memory_context += "\n" + memory_context += f"{value}\n" + memory_context += "\n" + memory_context += f"\n" + + if idx != len(blocks_with_content) - 1: + memory_context += "\n" + + memory_context += "\n\n\n" + memory_context += ( + "\n" + f"Users can view and edit their memory blocks at:\n" + f"https://app.letta.com/agents/{agent_id}\n\n" + "Share this link when users ask how to manage their memory, what you remember about them, or how to view, edit, or delete stored information.\n" + "\n\n" + "\n" + "- Memory blocks: https://docs.letta.com/guides/agents/memory-blocks/index.md\n" + "- Full Letta documentation: https://docs.letta.com/llms.txt\n\n" + "Reference these when users ask how Letta memory works or want to learn more about the platform.\n" + "\n" + "" + ) + return memory_context + + +def build_response_from_chunks(chunks: list[bytes]) -> str: + """Build complete response text from streaming chunks.""" + try: + text_parts = [] + full_data = b"".join(chunks).decode("utf-8") + + for line in full_data.split("\n"): + if line.startswith("data: "): + data_str = line[6:] # Remove "data: " prefix + + if data_str.strip() in ["[DONE]", ""]: + continue + + try: + event_data = json.loads(data_str) + event_type = event_data.get("type") + + if event_type == "content_block_delta": + delta = event_data.get("delta", {}) + if delta.get("type") == "text_delta": + text_parts.append(delta.get("text", "")) + except json.JSONDecodeError: + continue + + return "".join(text_parts) + except Exception as e: + logger.warning(f"[Proxy Helpers] Failed to build response from chunks: {e}") + return "" + + +async def inject_memory_context( + server, + agent, + actor, + request_data: dict, + proxy_name: str, +) -> dict: + """ + Inject memory context into the request system prompt. + + Args: + server: SyncServer instance + agent: Agent to get memory from + actor: Actor performing the operation + request_data: Request data dictionary to modify + proxy_name: Name of the proxy for logging (e.g., "Anthropic Proxy", "Z.ai Proxy") + + Returns: + Modified request data with memory context injected + """ + try: + messages = request_data.get("messages", []) + if not messages: + return request_data + + memory_context = format_memory_blocks(agent.blocks, agent.id) + + if not memory_context: + logger.debug(f"[{proxy_name}] No memory blocks found, skipping memory injection") + return request_data + + block_count = len([b for b in agent.blocks if b.value]) + logger.info(f"[{proxy_name}] Injecting {block_count} memory block(s) into request") + + # Inject into system prompt + modified_data = request_data.copy() + + # Check if there's already a system prompt + # Anthropic API accepts system as either a string or list of content blocks + existing_system = modified_data.get("system", "") + + # Handle both string and list system prompts + if isinstance(existing_system, list): + # If it's a list, prepend our context as a text block + modified_data["system"] = existing_system + [{"type": "text", "text": memory_context.rstrip()}] + elif existing_system: + # If it's a non-empty string, prepend our context + modified_data["system"] = memory_context + existing_system + else: + # No existing system prompt + modified_data["system"] = memory_context.rstrip() + + # Fix max_tokens if using extended thinking + # Anthropic requires max_tokens > thinking.budget_tokens + if "thinking" in modified_data and isinstance(modified_data["thinking"], dict): + budget_tokens = modified_data["thinking"].get("budget_tokens", 0) + current_max_tokens = modified_data.get("max_tokens", 0) + + if budget_tokens > 0 and current_max_tokens <= budget_tokens: + # Set max_tokens to budget_tokens + reasonable buffer for response + # Claude Code typically uses budget_tokens around 10000-20000 + modified_data["max_tokens"] = budget_tokens + 4096 + logger.info( + f"[{proxy_name}] Adjusted max_tokens from {current_max_tokens} to {modified_data['max_tokens']} (thinking.budget_tokens={budget_tokens})" + ) + + return modified_data + + except Exception as e: + logger.exception(f"[{proxy_name}] Failed to inject memory context: {e}") + return request_data + + +async def persist_messages_background( + server, + agent, + actor, + user_messages: list[str], + assistant_message: str, + model_name: str, + proxy_name: str, +): + """ + Background task to persist messages without blocking the response. + + This runs asynchronously after the response is returned to minimize latency. + + Args: + server: SyncServer instance + agent: Agent to persist messages for + actor: Actor performing the operation + user_messages: List of user messages to persist + assistant_message: Assistant message to persist + model_name: Model name for the messages + proxy_name: Name of the proxy for logging (e.g., "Anthropic Proxy", "Z.ai Proxy") + """ + try: + result = await capture_and_persist_messages( + server=server, + agent=agent, + actor=actor, + user_messages=user_messages, + assistant_message=assistant_message, + model=model_name, + ) + if result.get("success"): + logger.info(f"[{proxy_name}] Persisted messages: {result['messages_created']} messages saved") + else: + logger.debug(f"[{proxy_name}] Skipped persistence: {result.get('reason', 'unknown')}") + except Exception as e: + logger.error(f"[{proxy_name}] Failed to persist messages in background: {e}") + + +async def check_for_duplicate_message(server, agent, actor, user_messages: list[str], proxy_name: str) -> list[str]: + """ + Check if the last user message is a duplicate of the most recent persisted message. + + Returns a filtered list with duplicates removed to prevent race conditions. + + Args: + server: SyncServer instance + agent: Agent to check messages for + actor: Actor performing the operation + user_messages: List of user messages to check + proxy_name: Name of the proxy for logging + + Returns: + Filtered list of user messages (empty if duplicate detected) + """ + user_messages_to_persist = user_messages.copy() if user_messages else [] + if user_messages_to_persist: + try: + from letta.schemas.enums import MessageRole + + recent_messages = await server.message_manager.list_messages( + agent_id=agent.id, + actor=actor, + limit=5, + roles=[MessageRole.user], + ascending=False, + ) + if recent_messages: + last_user_msg = recent_messages[0] + last_message_text = "" + if last_user_msg.content: + for content_block in last_user_msg.content: + if hasattr(content_block, "text"): + last_message_text += content_block.text + + incoming_msg = user_messages_to_persist[-1] + if last_message_text and last_message_text == incoming_msg: + logger.info(f"[{proxy_name}] Skipping duplicate user message: {incoming_msg[:100]}...") + user_messages_to_persist = [] + except Exception as e: + logger.warning(f"[{proxy_name}] Failed to check for duplicate messages: {e}") + + return user_messages_to_persist + + +async def backfill_agent_project_id(server, agent, actor, project_id: str): + """ + Temporary helper to backfill project_id for legacy agents. + + TODO(@caren): Remove this function after all existing Claude Code agents have been backfilled. + + Args: + server: SyncServer instance + agent: Agent to update + actor: Actor performing the operation + project_id: Project ID to set + + Returns: + Updated agent or original agent if update fails + """ + from letta.schemas.agent import UpdateAgent + + try: + updated_agent = await server.update_agent_async( + agent_id=agent.id, + request=UpdateAgent(project_id=project_id), + actor=actor, + ) + logger.info(f"[Backfill] Successfully updated agent {agent.id} with project_id {project_id}") + return updated_agent + except Exception as e: + logger.warning(f"[Backfill] Failed to update agent project_id: {e}. Continuing with in-memory update.") + # Fallback: continue with in-memory update + agent.project_id = project_id + return agent + + +async def get_or_create_claude_code_agent( + server, + actor, + project_id: str = None, +): + """ + Get or create a special agent for Claude Code sessions. + + Args: + server: SyncServer instance + actor: Actor performing the operation (user ID) + project_id: Optional project ID to associate the agent with + + Returns: + Agent ID + """ + from letta.schemas.agent import CreateAgent + + # Create short user identifier from UUID (first 8 chars) + if actor: + user_short_id = str(actor.id)[:8] if hasattr(actor, "id") else str(actor)[:8] + else: + user_short_id = "default" + + agent_name = f"claude-code-{user_short_id}" + + try: + # Try to find existing agent by name (most reliable) + # Note: Search by name only, not tags, since name is unique and more reliable + logger.debug(f"Searching for agent with name: {agent_name}") + agents = await server.agent_manager.list_agents_async( + actor=actor, + limit=10, # Get a few in case of duplicates + name=agent_name, + include=["agent.blocks", "agent.managed_group", "agent.tags"], + ) + + # list_agents_async returns a list directly, not an object with .agents + logger.debug(f"Agent search returned {len(agents) if agents else 0} results") + if agents and len(agents) > 0: + # Return the first matching agent + logger.info(f"Found existing Claude Code agent: {agents[0].id} (name: {agent_name})") + agent = agents[0] + + # Temporary patch: Fix project_id if it's missing (legacy bug) + # TODO(@caren): Remove this after all existing Claude Code agents have been backfilled + if not agent.project_id and project_id: + logger.info(f"[Backfill] Agent {agent.id} missing project_id, backfilling with {project_id}") + agent = await backfill_agent_project_id(server, agent, actor, project_id) + + return agent + else: + logger.debug(f"No existing agent found with name: {agent_name}") + + except Exception as e: + logger.warning(f"Could not find existing agent: {e}", exc_info=True) + + # Create new agent + try: + logger.info(f"Creating new Claude Code agent: {agent_name} with project_id: {project_id}") + + # Create minimal agent config + agent_config = CreateAgent( + name=agent_name, + description="Agent for capturing Claude Code conversations", + memory_blocks=[ + { + "label": "human", + "value": "This is my section of core memory devoted to information about the human.\nI don't yet know anything about them.\nWhat's their name? Where are they from? What do they do? Who are they?\nI should update this memory over time as I interact with the human and learn more about them.", + "description": "A memory block for keeping track of the human (user) the agent is interacting with.", + }, + { + "label": "persona", + "value": "This is my section of core memory devoted to information myself.\nThere's nothing here yet.\nI should update this memory over time as I develop my personality.", + "description": "A memory block for storing the agent's core personality details and behavior profile.", + }, + { + "label": "project", + "value": "This is my section of core memory devoted to information about what the agent is working on.\nI don't yet know anything about it.\nI should update this memory over time with high level understanding and learnings.", + "description": "A memory block for storing the information about the project the agent is working on.", + }, + ], + tags=["claude-code"], + enable_sleeptime=True, + agent_type="letta_v1_agent", + model="anthropic/claude-sonnet-4-5-20250929", + embedding="openai/text-embedding-ada-002", + project_id=project_id, + ) + + new_agent = await server.create_agent_async( + request=agent_config, + actor=actor, + ) + + logger.info(f"Created Claude Code agent {new_agent.name}: {new_agent.id}") + return new_agent + + except Exception as e: + logger.exception(f"Failed to create Claude Code agent: {e}") + raise diff --git a/letta/server/rest_api/routers/v1/__init__.py b/letta/server/rest_api/routers/v1/__init__.py index 0e20956a..919675f7 100644 --- a/letta/server/rest_api/routers/v1/__init__.py +++ b/letta/server/rest_api/routers/v1/__init__.py @@ -26,9 +26,11 @@ from letta.server.rest_api.routers.v1.tags import router as tags_router from letta.server.rest_api.routers.v1.telemetry import router as telemetry_router from letta.server.rest_api.routers.v1.tools import router as tools_router from letta.server.rest_api.routers.v1.voice import router as voice_router +from letta.server.rest_api.routers.v1.zai import router as zai_router ROUTERS = [ anthropic_router, + zai_router, archives_router, tools_router, sources_router, diff --git a/letta/server/rest_api/routers/v1/anthropic.py b/letta/server/rest_api/routers/v1/anthropic.py index e804fdca..1171d272 100644 --- a/letta/server/rest_api/routers/v1/anthropic.py +++ b/letta/server/rest_api/routers/v1/anthropic.py @@ -7,282 +7,25 @@ from fastapi.responses import Response, StreamingResponse from letta.log import get_logger from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server -from letta.server.rest_api.utils import ( - capture_and_persist_messages, +from letta.server.rest_api.proxy_helpers import ( + build_response_from_chunks, + check_for_duplicate_message, + extract_assistant_message, + extract_user_messages, get_or_create_claude_code_agent, + inject_memory_context, + is_topic_detection_response, + persist_messages_background, + prepare_headers, ) from letta.server.server import SyncServer -from letta.settings import model_settings logger = get_logger(__name__) router = APIRouter(prefix="/anthropic", tags=["anthropic"]) ANTHROPIC_API_BASE = "https://api.anthropic.com" - - -def extract_user_messages(body: bytes) -> list[str]: - messages = [] - try: - request_data = json.loads(body) - messages = request_data.get("messages", []) - - user_messages = [] - for msg in messages: - if msg.get("role") == "user": - content = msg.get("content", "") - if isinstance(content, str): - user_messages.append(content) - elif isinstance(content, list): - for block in content: - if isinstance(block, dict): - if block.get("type") == "text": - user_messages.append(block.get("text", "")) - elif block.get("type") == "image": - user_messages.append("[IMAGE]") - - return user_messages - except Exception as e: - logger.warning(f"[Anthropic Proxy] Failed to extract user messages from request {messages}: {e}") - return [] - - -def extract_assistant_message(response_data: dict) -> str: - content_blocks = [] - try: - content_blocks = response_data.get("content", []) - text_parts = [] - - for block in content_blocks: - if isinstance(block, dict) and block.get("type") == "text": - text_parts.append(block.get("text", "")) - - return "\n".join(text_parts) - except Exception as e: - logger.warning(f"[Anthropic Proxy] Failed to extract assistant message from response {content_blocks}: {e}") - return "" - - -def is_topic_detection_response(message: str) -> bool: - """ - Check if the assistant message is a topic detection response (contains isNewTopic key). - These are Claude Code metadata responses that should not be persisted as conversation. - """ - try: - stripped = message.strip() - if stripped.startswith("{") and stripped.endswith("}"): - parsed = json.loads(stripped) - # Check for isNewTopic key which indicates topic detection - if "isNewTopic" in parsed: - return True - except (json.JSONDecodeError, AttributeError): - pass - return False - - -def prepare_anthropic_headers(request: Request) -> dict | None: - skip_headers = { - "host", - "connection", - "content-length", - "transfer-encoding", - "content-encoding", - "te", - "upgrade", - "proxy-authenticate", - "proxy-authorization", - "authorization", - } - - anthropic_headers = {} - for key, value in request.headers.items(): - if key.lower() not in skip_headers: - anthropic_headers[key] = value - - # Fallback to letta's anthropic api key if not provided - if "x-api-key" not in anthropic_headers and "anthropic-api-key" not in anthropic_headers: - anthropic_api_key = model_settings.anthropic_api_key - if anthropic_api_key: - logger.info("[Anthropic Proxy] Falling back to Letta's anthropic api key instead of user's key") - anthropic_headers["x-api-key"] = anthropic_api_key - - if "content-type" not in anthropic_headers: - anthropic_headers["content-type"] = "application/json" - - return anthropic_headers - - -def format_memory_blocks(blocks, agent_id: str) -> str: - blocks_with_content = [block for block in blocks if block.value] - - if not blocks_with_content: - return "" - - memory_context = ( - "\n" - "You have persistent memory powered by Letta that is maintained across conversations. " - "A background agent updates these memory blocks based on conversation content.\n" - "\n" - "The following memory blocks are currently engaged in your core memory unit:\n\n" - ) - - for idx, block in enumerate(blocks_with_content): - label = block.label or "block" - value = block.value or "" - desc = block.description or "" - chars_current = len(value) - limit = block.limit if block.limit is not None else 0 - - memory_context += f"<{label}>\n" - if desc: - memory_context += "\n" - memory_context += f"{desc}\n" - memory_context += "\n" - memory_context += "\n" - memory_context += f"- chars_current={chars_current}\n" - memory_context += f"- chars_limit={limit}\n" - memory_context += "\n" - memory_context += "\n" - memory_context += f"{value}\n" - memory_context += "\n" - memory_context += f"\n" - - if idx != len(blocks_with_content) - 1: - memory_context += "\n" - - memory_context += "\n\n\n" - memory_context += ( - "\n" - f"Users can view and edit their memory blocks at:\n" - f"https://app.letta.com/agents/{agent_id}\n\n" - "Share this link when users ask how to manage their memory, what you remember about them, or how to view, edit, or delete stored information.\n" - "\n\n" - "\n" - "- Memory blocks: https://docs.letta.com/guides/agents/memory-blocks/index.md\n" - "- Full Letta documentation: https://docs.letta.com/llms.txt\n\n" - "Reference these when users ask how Letta memory works or want to learn more about the platform.\n" - "\n" - "" - ) - return memory_context - - -def _build_response_from_chunks(chunks: list[bytes]) -> str: - try: - text_parts = [] - full_data = b"".join(chunks).decode("utf-8") - - for line in full_data.split("\n"): - if line.startswith("data: "): - data_str = line[6:] # Remove "data: " prefix - - if data_str.strip() in ["[DONE]", ""]: - continue - - try: - event_data = json.loads(data_str) - event_type = event_data.get("type") - - if event_type == "content_block_delta": - delta = event_data.get("delta", {}) - if delta.get("type") == "text_delta": - text_parts.append(delta.get("text", "")) - except json.JSONDecodeError: - continue - - return "".join(text_parts) - except Exception as e: - logger.warning(f"[Anthropic Proxy] Failed to build response from chunks: {e}") - return "" - - -async def _inject_memory_context( - server, - agent, - actor, - request_data: dict, -) -> dict: - try: - messages = request_data.get("messages", []) - if not messages: - return request_data - - memory_context = format_memory_blocks(agent.blocks, agent.id) - - if not memory_context: - logger.debug("[Anthropic Proxy] No memory blocks found, skipping memory injection") - return request_data - - block_count = len([b for b in agent.blocks if b.value]) - logger.info(f"[Anthropic Proxy] Injecting {block_count} memory block(s) into request") - - # Inject into system prompt - modified_data = request_data.copy() - - # Check if there's already a system prompt - # Anthropic API accepts system as either a string or list of content blocks - existing_system = modified_data.get("system", "") - - # Handle both string and list system prompts - if isinstance(existing_system, list): - # If it's a list, prepend our context as a text block - modified_data["system"] = existing_system + [{"type": "text", "text": memory_context.rstrip()}] - elif existing_system: - # If it's a non-empty string, prepend our context - modified_data["system"] = memory_context + existing_system - else: - # No existing system prompt - modified_data["system"] = memory_context.rstrip() - - # Fix max_tokens if using extended thinking - # Anthropic requires max_tokens > thinking.budget_tokens - if "thinking" in modified_data and isinstance(modified_data["thinking"], dict): - budget_tokens = modified_data["thinking"].get("budget_tokens", 0) - current_max_tokens = modified_data.get("max_tokens", 0) - - if budget_tokens > 0 and current_max_tokens <= budget_tokens: - # Set max_tokens to budget_tokens + reasonable buffer for response - # Claude Code typically uses budget_tokens around 10000-20000 - modified_data["max_tokens"] = budget_tokens + 4096 - logger.info( - f"[Anthropic Proxy] Adjusted max_tokens from {current_max_tokens} to {modified_data['max_tokens']} (thinking.budget_tokens={budget_tokens})" - ) - - return modified_data - - except Exception as e: - logger.exception(f"[Anthropic Proxy] Failed to inject memory context: {e}") - return request_data - - -async def _persist_messages_background( - server, - agent, - actor, - user_messages: list[str], - assistant_message: str, - model_name: str, -): - """ - Background task to persist messages without blocking the response. - - This runs asynchronously after the response is returned to minimize latency. - """ - try: - result = await capture_and_persist_messages( - server=server, - agent=agent, - actor=actor, - user_messages=user_messages, - assistant_message=assistant_message, - model=model_name, - ) - if result.get("success"): - logger.info(f"[Anthropic Proxy] Persisted messages: {result['messages_created']} messages saved") - else: - logger.debug(f"[Anthropic Proxy] Skipped persistence: {result.get('reason', 'unknown')}") - except Exception as e: - logger.error(f"[Anthropic Proxy] Failed to persist messages in background: {e}") +PROXY_NAME = "Anthropic Proxy" @router.api_route("/v1/messages", methods=["POST"], operation_id="anthropic_messages_proxy", include_in_schema=False) @@ -307,8 +50,8 @@ async def anthropic_messages_proxy( # Get the request body body = await request.body() - logger.info(f"[Anthropic Proxy] Proxying request to Anthropic Messages API: {ANTHROPIC_API_BASE}/v1/messages") - logger.debug(f"[Anthropic Proxy] Request body preview: {body[:200]}...") + logger.info(f"[{PROXY_NAME}] Proxying request to Anthropic Messages API: {ANTHROPIC_API_BASE}/v1/messages") + logger.debug(f"[{PROXY_NAME}] Request body preview: {body[:200]}...") actor = await server.user_manager.get_actor_or_default_async(headers.actor_id) @@ -322,11 +65,11 @@ async def anthropic_messages_proxy( # Filter out system/metadata requests user_messages = [s for s in user_messages if not s.startswith("")] if not user_messages: - logger.debug("[Anthropic Proxy] Skipping capture/memory for this turn") + logger.debug(f"[{PROXY_NAME}] Skipping capture/memory for this turn") - anthropic_headers = prepare_anthropic_headers(request) + anthropic_headers = prepare_headers(request, PROXY_NAME) if not anthropic_headers: - logger.error("[Anthropic Proxy] No Anthropic API key found in headers or settings") + logger.error(f"[{PROXY_NAME}] No Anthropic API key found in headers or settings") return Response( content='{"error": {"type": "authentication_error", "message": "Anthropic API key required. Pass via anthropic-api-key or x-api-key header."}}', status_code=401, @@ -342,11 +85,11 @@ async def anthropic_messages_proxy( model_name = request_data.get("model") # Extract and remove project_id (internal use only, not for Anthropic API) project_id = request_data.pop("project_id", None) - logger.debug(f"[Anthropic Proxy] Request is streaming: {is_streaming}") - logger.debug(f"[Anthropic Proxy] Model: {model_name}") - logger.debug(f"[Anthropic Proxy] Project ID: {project_id}") + logger.debug(f"[{PROXY_NAME}] Request is streaming: {is_streaming}") + logger.debug(f"[{PROXY_NAME}] Model: {model_name}") + logger.debug(f"[{PROXY_NAME}] Project ID: {project_id}") except Exception as e: - logger.warning(f"[Anthropic Proxy] Failed to parse request body: {e}") + logger.warning(f"[{PROXY_NAME}] Failed to parse request body: {e}") is_streaming = False model_name = None project_id = None @@ -361,20 +104,21 @@ async def anthropic_messages_proxy( actor=actor, project_id=project_id, ) - logger.debug(f"[Anthropic Proxy] Using agent ID: {agent.id}") + logger.debug(f"[{PROXY_NAME}] Using agent ID: {agent.id}") except Exception as e: - logger.error(f"[Anthropic Proxy] Failed to get/create agent: {e}") + logger.error(f"[{PROXY_NAME}] Failed to get/create agent: {e}") # Inject memory context into request (skip for system requests) # TODO: Optimize - skip memory injection on subsequent messages in same session # TODO: Add caching layer to avoid duplicate memory searches modified_body = body if agent and request_data: - modified_request_data = await _inject_memory_context( + modified_request_data = await inject_memory_context( server=server, agent=agent, actor=actor, request_data=request_data, + proxy_name=PROXY_NAME, ) # Re-encode the modified request import json @@ -405,7 +149,7 @@ async def anthropic_messages_proxy( yield chunk # After streaming is complete, extract and log assistant message - assistant_message = _build_response_from_chunks(collected_chunks) + assistant_message = build_response_from_chunks(collected_chunks) if user_messages and assistant_message: logger.info("=" * 70) logger.info("📨 CAPTURED USER MESSAGE:") @@ -418,46 +162,22 @@ async def anthropic_messages_proxy( # Skip persisting topic detection responses (metadata, not conversation) if is_topic_detection_response(assistant_message): - logger.debug("[Anthropic Proxy] Skipping persistence - topic detection response") + logger.debug(f"[{PROXY_NAME}] Skipping persistence - topic detection response") # Persist messages to database (non-blocking, skip for system requests) elif agent: # Check for duplicate user messages before creating background task # This prevents race conditions where multiple requests persist the same message - user_messages_to_persist = user_messages.copy() if user_messages else [] - if user_messages_to_persist: - try: - from letta.schemas.enums import MessageRole - - recent_messages = await server.message_manager.list_messages( - agent_id=agent.id, - actor=actor, - limit=5, - roles=[MessageRole.user], - ascending=False, - ) - if recent_messages: - last_user_msg = recent_messages[0] - last_message_text = "" - if last_user_msg.content: - for content_block in last_user_msg.content: - if hasattr(content_block, "text"): - last_message_text += content_block.text - - incoming_msg = user_messages_to_persist[-1] - if last_message_text and last_message_text == incoming_msg: - logger.info(f"[Anthropic Proxy] Skipping duplicate user message: {incoming_msg[:100]}...") - user_messages_to_persist = [] - except Exception as e: - logger.warning(f"[Anthropic Proxy] Failed to check for duplicate messages: {e}") + user_messages_to_persist = await check_for_duplicate_message(server, agent, actor, user_messages, PROXY_NAME) asyncio.create_task( - _persist_messages_background( + persist_messages_background( server=server, agent=agent, actor=actor, user_messages=user_messages_to_persist, assistant_message=assistant_message, model_name=model_name, + proxy_name=PROXY_NAME, ) ) @@ -497,45 +217,25 @@ async def anthropic_messages_proxy( # Skip persisting topic detection responses (metadata, not conversation) if is_topic_detection_response(assistant_message): - logger.debug("[Anthropic Proxy] Skipping persistence - topic detection response") + logger.debug(f"[{PROXY_NAME}] Skipping persistence - topic detection response") # Persist messages to database (non-blocking) elif agent: # Check for duplicate user messages before creating background task - user_messages_to_persist = user_messages.copy() if user_messages else [] - if user_messages_to_persist: - try: - from letta.schemas.enums import MessageRole - - recent_messages = await server.message_manager.list_messages( - agent_id=agent.id, actor=actor, limit=5, roles=[MessageRole.user] - ) - if recent_messages: - last_user_msg = recent_messages[0] - last_message_text = "" - if last_user_msg.content: - for content_block in last_user_msg.content: - if hasattr(content_block, "text"): - last_message_text += content_block.text - - incoming_msg = user_messages_to_persist[-1] - if last_message_text and last_message_text == incoming_msg: - logger.info(f"[Anthropic Proxy] Skipping duplicate user message: {incoming_msg[:100]}...") - user_messages_to_persist = [] - except Exception as e: - logger.warning(f"[Anthropic Proxy] Failed to check for duplicate messages: {e}") + user_messages_to_persist = await check_for_duplicate_message(server, agent, actor, user_messages, PROXY_NAME) asyncio.create_task( - _persist_messages_background( + persist_messages_background( server=server, agent=agent, actor=actor, user_messages=user_messages_to_persist, assistant_message=assistant_message, model_name=model_name, + proxy_name=PROXY_NAME, ) ) except Exception as e: - logger.warning(f"[Anthropic Proxy] Failed to extract assistant response for logging: {e}") + logger.warning(f"[{PROXY_NAME}] Failed to extract assistant response for logging: {e}") return Response( content=response.content, @@ -549,7 +249,7 @@ async def anthropic_messages_proxy( ) except httpx.HTTPError as e: - logger.error(f"[Anthropic Proxy] Error proxying request to Anthropic API: {e}") + logger.error(f"[{PROXY_NAME}] Error proxying request to Anthropic API: {e}") return Response( content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Anthropic API: {str(e)}"}}}}', status_code=500, @@ -590,11 +290,11 @@ async def anthropic_catchall_proxy( # Reconstruct the full path path = f"v1/{endpoint}" - logger.info(f"[Anthropic Proxy] Proxying catch-all request: {request.method} /{path}") + logger.info(f"[{PROXY_NAME}] Proxying catch-all request: {request.method} /{path}") - anthropic_headers = prepare_anthropic_headers(request) + anthropic_headers = prepare_headers(request, PROXY_NAME) if not anthropic_headers: - logger.error("[Anthropic Proxy] No Anthropic API key found in headers or settings") + logger.error(f"[{PROXY_NAME}] No Anthropic API key found in headers or settings") return Response( content='{"error": {"type": "authentication_error", "message": "Anthropic API key required"}}', status_code=401, @@ -623,7 +323,7 @@ async def anthropic_catchall_proxy( ) except httpx.HTTPError as e: - logger.error(f"[Anthropic Proxy] Error proxying catch-all request to Anthropic API: {e}") + logger.error(f"[{PROXY_NAME}] Error proxying catch-all request to Anthropic API: {e}") return Response( content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Anthropic API: {str(e)}"}}}}', status_code=500, diff --git a/letta/server/rest_api/routers/v1/zai.py b/letta/server/rest_api/routers/v1/zai.py new file mode 100644 index 00000000..7b63a99c --- /dev/null +++ b/letta/server/rest_api/routers/v1/zai.py @@ -0,0 +1,331 @@ +import asyncio +import json + +import httpx +from fastapi import APIRouter, Depends, Request +from fastapi.responses import Response, StreamingResponse + +from letta.log import get_logger +from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server +from letta.server.rest_api.proxy_helpers import ( + build_response_from_chunks, + check_for_duplicate_message, + extract_assistant_message, + extract_user_messages, + get_or_create_claude_code_agent, + inject_memory_context, + is_topic_detection_response, + persist_messages_background, + prepare_headers, +) +from letta.server.server import SyncServer + +logger = get_logger(__name__) + +router = APIRouter(prefix="/zai", tags=["zai"]) + +ZAI_API_BASE = "https://api.z.ai/api/anthropic" +PROXY_NAME = "Z.ai Proxy" + + +@router.api_route("/v1/messages", methods=["POST"], operation_id="zai_messages_proxy", include_in_schema=False) +async def zai_messages_proxy( + request: Request, + server: SyncServer = Depends(get_letta_server), + headers: HeaderParams = Depends(get_headers), +): + """ + Proxy endpoint for Z.ai Messages API. + + This endpoint forwards requests to the Z.ai API, allowing Claude Code CLI + to use Letta as a proxy by configuring anthropic_base_url. + + Usage in Claude Code CLI settings.json: + { + "env": { + "ANTHROPIC_BASE_URL": "http://localhost:3000/v1/zai" + } + } + """ + # Get the request body + body = await request.body() + + logger.info(f"[{PROXY_NAME}] Proxying request to Z.ai Messages API: {ZAI_API_BASE}/v1/messages") + logger.debug(f"[{PROXY_NAME}] Request body preview: {body[:200]}...") + + actor = await server.user_manager.get_actor_or_default_async(headers.actor_id) + + # Extract all user messages from request + all_user_messages = extract_user_messages(body) + + # Only capture the LAST user message (the new one the user just sent) + # Claude Code sends full conversation history, but we only want to persist the new message + user_messages = [all_user_messages[-1]] if all_user_messages else [] + + # Filter out system/metadata requests + user_messages = [s for s in user_messages if not s.startswith("")] + if not user_messages: + logger.debug(f"[{PROXY_NAME}] Skipping capture/memory for this turn") + + zai_headers = prepare_headers(request, PROXY_NAME, use_bearer_auth=True) + if not zai_headers: + logger.error(f"[{PROXY_NAME}] No Anthropic API key found in headers or settings") + return Response( + content='{"error": {"type": "authentication_error", "message": "Anthropic API key required. Pass via anthropic-api-key or x-api-key header."}}', + status_code=401, + media_type="application/json", + ) + + # Check if this is a streaming request + try: + import json + + request_data = json.loads(body) + is_streaming = request_data.get("stream", False) + model_name = request_data.get("model") + # Extract and remove project_id (internal use only, not for Z.ai API) + project_id = request_data.pop("project_id", None) + logger.debug(f"[{PROXY_NAME}] Request is streaming: {is_streaming}") + logger.debug(f"[{PROXY_NAME}] Model: {model_name}") + logger.debug(f"[{PROXY_NAME}] Project ID: {project_id}") + except Exception as e: + logger.warning(f"[{PROXY_NAME}] Failed to parse request body: {e}") + is_streaming = False + model_name = None + project_id = None + + # Get or create agent for Claude Code session (skip for system requests) + # Note: Agent lookup and memory search are blocking operations before forwarding. + # Message persistence happens in the background after the response is returned. + agent = None + try: + agent = await get_or_create_claude_code_agent( + server=server, + actor=actor, + project_id=project_id, + ) + logger.debug(f"[{PROXY_NAME}] Using agent ID: {agent.id}") + except Exception as e: + logger.error(f"[{PROXY_NAME}] Failed to get/create agent: {e}") + + # Inject memory context into request (skip for system requests) + # TODO: Optimize - skip memory injection on subsequent messages in same session + # TODO: Add caching layer to avoid duplicate memory searches + modified_body = body + if agent and request_data: + modified_request_data = await inject_memory_context( + server=server, + agent=agent, + actor=actor, + request_data=request_data, + proxy_name=PROXY_NAME, + ) + # Re-encode the modified request + import json + + modified_body = json.dumps(modified_request_data).encode("utf-8") + + # Forward the request to Z.ai API (preserve query params like ?beta=true) + # Note: For streaming, we create the client outside the generator to keep it alive + zai_url = f"{ZAI_API_BASE}/v1/messages" + if request.url.query: + zai_url = f"{zai_url}?{request.url.query}" + + if is_streaming: + # Handle streaming response + collected_chunks = [] + + async def stream_response(): + # Create client inside the generator so it stays alive during streaming + async with httpx.AsyncClient(timeout=300.0) as client: + async with client.stream( + "POST", + zai_url, + headers=zai_headers, + content=modified_body, + ) as response: + async for chunk in response.aiter_bytes(): + collected_chunks.append(chunk) + yield chunk + + # After streaming is complete, extract and log assistant message + assistant_message = build_response_from_chunks(collected_chunks) + if user_messages and assistant_message: + logger.info("=" * 70) + logger.info("📨 CAPTURED USER MESSAGE:") + for i, user_message in enumerate(user_messages): + logger.info(f" {i}: {user_message[:200]}{'...' if len(user_message) > 200 else ''}") + logger.info("=" * 70) + logger.info("🤖 CAPTURED ASSISTANT RESPONSE (streaming):") + logger.info(f" {assistant_message[:200]}{'...' if len(assistant_message) > 200 else ''}") + logger.info("=" * 70) + + # Skip persisting topic detection responses (metadata, not conversation) + if is_topic_detection_response(assistant_message): + logger.debug(f"[{PROXY_NAME}] Skipping persistence - topic detection response") + # Persist messages to database (non-blocking, skip for system requests) + elif agent: + # Check for duplicate user messages before creating background task + # This prevents race conditions where multiple requests persist the same message + user_messages_to_persist = await check_for_duplicate_message(server, agent, actor, user_messages, PROXY_NAME) + + asyncio.create_task( + persist_messages_background( + server=server, + agent=agent, + actor=actor, + user_messages=user_messages_to_persist, + assistant_message=assistant_message, + model_name=model_name, + proxy_name=PROXY_NAME, + ) + ) + + return StreamingResponse( + stream_response(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + + # Non-streaming path + async with httpx.AsyncClient(timeout=300.0) as client: + try: + # Handle non-streaming response + response = await client.post( + zai_url, + headers=zai_headers, + content=modified_body, + ) + + logger.info(f"Successfully proxied request, status: {response.status_code}") + + # Extract and log assistant message + if response.status_code == 200: + try: + import json + + response_data = json.loads(response.content) + assistant_message = extract_assistant_message(response_data) + if assistant_message: + logger.info("=" * 70) + logger.info("🤖 CAPTURED ASSISTANT RESPONSE:") + logger.info(f" {assistant_message[:500]}{'...' if len(assistant_message) > 500 else ''}") + logger.info("=" * 70) + + # Skip persisting topic detection responses (metadata, not conversation) + if is_topic_detection_response(assistant_message): + logger.debug(f"[{PROXY_NAME}] Skipping persistence - topic detection response") + # Persist messages to database (non-blocking) + elif agent: + # Check for duplicate user messages before creating background task + user_messages_to_persist = await check_for_duplicate_message(server, agent, actor, user_messages, PROXY_NAME) + + asyncio.create_task( + persist_messages_background( + server=server, + agent=agent, + actor=actor, + user_messages=user_messages_to_persist, + assistant_message=assistant_message, + model_name=model_name, + proxy_name=PROXY_NAME, + ) + ) + except Exception as e: + logger.warning(f"[{PROXY_NAME}] Failed to extract assistant response for logging: {e}") + + return Response( + content=response.content, + status_code=response.status_code, + media_type=response.headers.get("content-type", "application/json"), + headers={ + k: v + for k, v in response.headers.items() + if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"] + }, + ) + + except httpx.HTTPError as e: + logger.error(f"[{PROXY_NAME}] Error proxying request to Z.ai API: {e}") + return Response( + content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Z.ai API: {str(e)}"}}}}', + status_code=500, + media_type="application/json", + ) + + +@router.api_route( + "/v1/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + operation_id="zai_catchall_proxy", + include_in_schema=False, +) +async def zai_catchall_proxy( + endpoint: str, + request: Request, + server: SyncServer = Depends(get_letta_server), + headers: HeaderParams = Depends(get_headers), +): + """ + Catch-all proxy for other Z.ai API endpoints. + + This forwards all other requests (like /v1/messages/count_tokens) directly to Z.ai + without message capture or memory injection. + """ + # Skip the /v1/messages endpoint (handled by specific route) + if endpoint == "messages" and request.method == "POST": + # This should be handled by the specific route, but just in case return error + return Response( + content='{"error": {"type": "routing_error", "message": "Use specific /v1/messages endpoint"}}', + status_code=500, + media_type="application/json", + ) + + # Get the request body + body = await request.body() + + # Reconstruct the full path + path = f"v1/{endpoint}" + + logger.info(f"[{PROXY_NAME}] Proxying catch-all request: {request.method} /{path}") + + zai_headers = prepare_headers(request, PROXY_NAME, use_bearer_auth=True) + if not zai_headers: + logger.error(f"[{PROXY_NAME}] No Anthropic API key found in headers or settings") + return Response( + content='{"error": {"type": "authentication_error", "message": "Anthropic API key required"}}', + status_code=401, + media_type="application/json", + ) + + # Forward the request to Z.ai API + async with httpx.AsyncClient(timeout=300.0) as client: + try: + response = await client.request( + method=request.method, + url=f"{ZAI_API_BASE}/{path}", + headers=zai_headers, + content=body if body else None, + ) + + return Response( + content=response.content, + status_code=response.status_code, + media_type=response.headers.get("content-type", "application/json"), + headers={ + k: v + for k, v in response.headers.items() + if k.lower() not in ["content-encoding", "content-length", "transfer-encoding", "connection"] + }, + ) + + except httpx.HTTPError as e: + logger.error(f"[{PROXY_NAME}] Error proxying catch-all request to Z.ai API: {e}") + return Response( + content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Z.ai API: {str(e)}"}}}}', + status_code=500, + media_type="application/json", + ) diff --git a/letta/server/rest_api/utils.py b/letta/server/rest_api/utils.py index e5ec8e7e..25186f8e 100644 --- a/letta/server/rest_api/utils.py +++ b/letta/server/rest_api/utils.py @@ -858,138 +858,3 @@ async def capture_and_persist_messages( "messages_created": len(response_messages), "run_ids": run_ids, } - - -async def _backfill_agent_project_id(server, agent, actor, project_id: str): - """ - Temporary helper to backfill project_id for legacy agents. - - TODO(@caren): Remove this function after all existing Claude Code agents have been backfilled. - - Args: - server: SyncServer instance - agent: Agent to update - actor: Actor performing the operation - project_id: Project ID to set - - Returns: - Updated agent or original agent if update fails - """ - from letta.schemas.agent import UpdateAgent - - try: - updated_agent = await server.update_agent_async( - agent_id=agent.id, - request=UpdateAgent(project_id=project_id), - actor=actor, - ) - logger.info(f"[Backfill] Successfully updated agent {agent.id} with project_id {project_id}") - return updated_agent - except Exception as e: - logger.warning(f"[Backfill] Failed to update agent project_id: {e}. Continuing with in-memory update.") - # Fallback: continue with in-memory update - agent.project_id = project_id - return agent - - -async def get_or_create_claude_code_agent( - server, - actor, - project_id: Optional[str] = None, -): - """ - Get or create a special agent for Claude Code sessions. - - Args: - server: SyncServer instance - actor: Actor performing the operation (user ID) - project_id: Optional project ID to associate the agent with - - Returns: - Agent ID - """ - from letta.schemas.agent import CreateAgent - - # Create short user identifier from UUID (first 8 chars) - if actor: - user_short_id = str(actor.id)[:8] if hasattr(actor, "id") else str(actor)[:8] - else: - user_short_id = "default" - - agent_name = f"claude-code-{user_short_id}" - - try: - # Try to find existing agent by name (most reliable) - # Note: Search by name only, not tags, since name is unique and more reliable - logger.debug(f"Searching for agent with name: {agent_name}") - agents = await server.agent_manager.list_agents_async( - actor=actor, - limit=10, # Get a few in case of duplicates - name=agent_name, - include=["agent.blocks", "agent.managed_group", "agent.tags"], - ) - - # list_agents_async returns a list directly, not an object with .agents - logger.debug(f"Agent search returned {len(agents) if agents else 0} results") - if agents and len(agents) > 0: - # Return the first matching agent - logger.info(f"Found existing Claude Code agent: {agents[0].id} (name: {agent_name})") - agent = agents[0] - - # Temporary patch: Fix project_id if it's missing (legacy bug) - # TODO(@caren): Remove this after all existing Claude Code agents have been backfilled - if not agent.project_id and project_id: - logger.info(f"[Backfill] Agent {agent.id} missing project_id, backfilling with {project_id}") - agent = await _backfill_agent_project_id(server, agent, actor, project_id) - - return agent - else: - logger.debug(f"No existing agent found with name: {agent_name}") - - except Exception as e: - logger.warning(f"Could not find existing agent: {e}", exc_info=True) - - # Create new agent - try: - logger.info(f"Creating new Claude Code agent: {agent_name} with project_id: {project_id}") - - # Create minimal agent config - agent_config = CreateAgent( - name=agent_name, - description="Agent for capturing Claude Code conversations", - memory_blocks=[ - { - "label": "human", - "value": "This is my section of core memory devoted to information about the human.\nI don't yet know anything about them.\nWhat's their name? Where are they from? What do they do? Who are they?\nI should update this memory over time as I interact with the human and learn more about them.", - "description": "A memory block for keeping track of the human (user) the agent is interacting with.", - }, - { - "label": "persona", - "value": "This is my section of core memory devoted to information myself.\nThere's nothing here yet.\nI should update this memory over time as I develop my personality.", - "description": "A memory block for storing the agent's core personality details and behavior profile.", - }, - { - "label": "project", - "value": "This is my section of core memory devoted to information about what the agent is working on.\nI don't yet know anything about it.\nI should update this memory over time with high level understanding and learnings.", - "description": "A memory block for storing the information about the project the agent is working on.", - }, - ], - tags=["claude-code"], - enable_sleeptime=True, - agent_type="letta_v1_agent", - model="anthropic/claude-sonnet-4-5-20250929", - embedding="openai/text-embedding-ada-002", - project_id=project_id, - ) - - new_agent = await server.create_agent_async( - request=agent_config, - actor=actor, - ) - - logger.info(f"Created Claude Code agent {new_agent.name}: {new_agent.id}") - return new_agent - - except Exception as e: - logger.exception(f"Failed to create Claude Code agent: {e}") - raise