feat: skip persisting redundant messages for proxy (#6819)
This commit is contained in:
@@ -63,6 +63,23 @@ def extract_assistant_message(response_data: dict) -> str:
|
||||
return ""
|
||||
|
||||
|
||||
def is_topic_detection_response(message: str) -> bool:
|
||||
"""
|
||||
Check if the assistant message is a topic detection response (contains isNewTopic key).
|
||||
These are Claude Code metadata responses that should not be persisted as conversation.
|
||||
"""
|
||||
try:
|
||||
stripped = message.strip()
|
||||
if stripped.startswith("{") and stripped.endswith("}"):
|
||||
parsed = json.loads(stripped)
|
||||
# Check for isNewTopic key which indicates topic detection
|
||||
if "isNewTopic" in parsed:
|
||||
return True
|
||||
except (json.JSONDecodeError, AttributeError):
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def prepare_anthropic_headers(request: Request) -> dict | None:
|
||||
skip_headers = {
|
||||
"host",
|
||||
@@ -260,7 +277,10 @@ async def _persist_messages_background(
|
||||
assistant_message=assistant_message,
|
||||
model=model_name,
|
||||
)
|
||||
logger.info(f"[Anthropic Proxy] Persisted messages: {result['messages_created']} messages saved")
|
||||
if result.get("success"):
|
||||
logger.info(f"[Anthropic Proxy] Persisted messages: {result['messages_created']} messages saved")
|
||||
else:
|
||||
logger.debug(f"[Anthropic Proxy] Skipped persistence: {result.get('reason', 'unknown')}")
|
||||
except Exception as e:
|
||||
logger.error(f"[Anthropic Proxy] Failed to persist messages in background: {e}")
|
||||
|
||||
@@ -361,8 +381,12 @@ async def anthropic_messages_proxy(
|
||||
|
||||
modified_body = json.dumps(modified_request_data).encode("utf-8")
|
||||
|
||||
# Forward the request to Anthropic API
|
||||
# Forward the request to Anthropic API (preserve query params like ?beta=true)
|
||||
# Note: For streaming, we create the client outside the generator to keep it alive
|
||||
anthropic_url = f"{ANTHROPIC_API_BASE}/v1/messages"
|
||||
if request.url.query:
|
||||
anthropic_url = f"{anthropic_url}?{request.url.query}"
|
||||
|
||||
if is_streaming:
|
||||
# Handle streaming response
|
||||
collected_chunks = []
|
||||
@@ -372,7 +396,7 @@ async def anthropic_messages_proxy(
|
||||
async with httpx.AsyncClient(timeout=300.0) as client:
|
||||
async with client.stream(
|
||||
"POST",
|
||||
f"{ANTHROPIC_API_BASE}/v1/messages",
|
||||
anthropic_url,
|
||||
headers=anthropic_headers,
|
||||
content=modified_body,
|
||||
) as response:
|
||||
@@ -385,21 +409,53 @@ async def anthropic_messages_proxy(
|
||||
if user_messages and assistant_message:
|
||||
logger.info("=" * 70)
|
||||
logger.info("📨 CAPTURED USER MESSAGE:")
|
||||
for user_message in user_messages:
|
||||
logger.info(f" {user_message[:200]}{'...' if len(user_message) > 200 else ''}")
|
||||
for i, user_message in enumerate(user_messages):
|
||||
logger.info(f" {i}: {user_message[:200]}{'...' if len(user_message) > 200 else ''}")
|
||||
logger.info("=" * 70)
|
||||
logger.info("🤖 CAPTURED ASSISTANT RESPONSE (streaming):")
|
||||
logger.info(f" {assistant_message[:200]}{'...' if len(assistant_message) > 200 else ''}")
|
||||
logger.info("=" * 70)
|
||||
|
||||
# Skip persisting topic detection responses (metadata, not conversation)
|
||||
if is_topic_detection_response(assistant_message):
|
||||
logger.debug("[Anthropic Proxy] Skipping persistence - topic detection response")
|
||||
# Persist messages to database (non-blocking, skip for system requests)
|
||||
if agent and user_messages:
|
||||
elif agent:
|
||||
# Check for duplicate user messages before creating background task
|
||||
# This prevents race conditions where multiple requests persist the same message
|
||||
user_messages_to_persist = user_messages.copy() if user_messages else []
|
||||
if user_messages_to_persist:
|
||||
try:
|
||||
from letta.schemas.enums import MessageRole
|
||||
|
||||
recent_messages = await server.message_manager.list_messages(
|
||||
agent_id=agent.id,
|
||||
actor=actor,
|
||||
limit=5,
|
||||
roles=[MessageRole.user],
|
||||
ascending=False,
|
||||
)
|
||||
if recent_messages:
|
||||
last_user_msg = recent_messages[0]
|
||||
last_message_text = ""
|
||||
if last_user_msg.content:
|
||||
for content_block in last_user_msg.content:
|
||||
if hasattr(content_block, "text"):
|
||||
last_message_text += content_block.text
|
||||
|
||||
incoming_msg = user_messages_to_persist[-1]
|
||||
if last_message_text and last_message_text == incoming_msg:
|
||||
logger.info(f"[Anthropic Proxy] Skipping duplicate user message: {incoming_msg[:100]}...")
|
||||
user_messages_to_persist = []
|
||||
except Exception as e:
|
||||
logger.warning(f"[Anthropic Proxy] Failed to check for duplicate messages: {e}")
|
||||
|
||||
asyncio.create_task(
|
||||
_persist_messages_background(
|
||||
server=server,
|
||||
agent=agent,
|
||||
actor=actor,
|
||||
user_messages=user_messages,
|
||||
user_messages=user_messages_to_persist,
|
||||
assistant_message=assistant_message,
|
||||
model_name=model_name,
|
||||
)
|
||||
@@ -419,7 +475,7 @@ async def anthropic_messages_proxy(
|
||||
try:
|
||||
# Handle non-streaming response
|
||||
response = await client.post(
|
||||
f"{ANTHROPIC_API_BASE}/v1/messages",
|
||||
anthropic_url,
|
||||
headers=anthropic_headers,
|
||||
content=modified_body,
|
||||
)
|
||||
@@ -439,14 +495,41 @@ async def anthropic_messages_proxy(
|
||||
logger.info(f" {assistant_message[:500]}{'...' if len(assistant_message) > 500 else ''}")
|
||||
logger.info("=" * 70)
|
||||
|
||||
# Skip persisting topic detection responses (metadata, not conversation)
|
||||
if is_topic_detection_response(assistant_message):
|
||||
logger.debug("[Anthropic Proxy] Skipping persistence - topic detection response")
|
||||
# Persist messages to database (non-blocking)
|
||||
if agent and user_messages:
|
||||
elif agent:
|
||||
# Check for duplicate user messages before creating background task
|
||||
user_messages_to_persist = user_messages.copy() if user_messages else []
|
||||
if user_messages_to_persist:
|
||||
try:
|
||||
from letta.schemas.enums import MessageRole
|
||||
|
||||
recent_messages = await server.message_manager.list_messages(
|
||||
agent_id=agent.id, actor=actor, limit=5, roles=[MessageRole.user]
|
||||
)
|
||||
if recent_messages:
|
||||
last_user_msg = recent_messages[0]
|
||||
last_message_text = ""
|
||||
if last_user_msg.content:
|
||||
for content_block in last_user_msg.content:
|
||||
if hasattr(content_block, "text"):
|
||||
last_message_text += content_block.text
|
||||
|
||||
incoming_msg = user_messages_to_persist[-1]
|
||||
if last_message_text and last_message_text == incoming_msg:
|
||||
logger.info(f"[Anthropic Proxy] Skipping duplicate user message: {incoming_msg[:100]}...")
|
||||
user_messages_to_persist = []
|
||||
except Exception as e:
|
||||
logger.warning(f"[Anthropic Proxy] Failed to check for duplicate messages: {e}")
|
||||
|
||||
asyncio.create_task(
|
||||
_persist_messages_background(
|
||||
server=server,
|
||||
agent=agent,
|
||||
actor=actor,
|
||||
user_messages=user_messages,
|
||||
user_messages=user_messages_to_persist,
|
||||
assistant_message=assistant_message,
|
||||
model_name=model_name,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user