fix: handle compact edge case in idempotency check (#9588)
This commit is contained in:
@@ -269,12 +269,31 @@ async def _prepare_in_context_messages_no_persist_async(
|
||||
for msg in reversed(recent_messages):
|
||||
if msg.role == "tool" and validate_persisted_tool_call_ids(msg, input_messages[0]):
|
||||
logger.info(
|
||||
f"Idempotency check: Found matching tool return in recent history. "
|
||||
f"Idempotency check: Found matching tool return in recent in-context history. "
|
||||
f"tool_returns={msg.tool_returns}, approval_response.approvals={input_messages[0].approvals}"
|
||||
)
|
||||
approval_already_processed = True
|
||||
break
|
||||
|
||||
# If not found in context and summarization just happened, check full history
|
||||
non_system_summary_messages = [
|
||||
m for m in current_in_context_messages if m.role not in (MessageRole.system, MessageRole.summary)
|
||||
]
|
||||
if not approval_already_processed and len(non_system_summary_messages) == 0:
|
||||
last_tool_messages = await message_manager.list_messages(
|
||||
actor=actor,
|
||||
agent_id=agent_state.id,
|
||||
roles=[MessageRole.tool],
|
||||
limit=1,
|
||||
ascending=False, # Most recent first
|
||||
)
|
||||
if len(last_tool_messages) == 1 and validate_persisted_tool_call_ids(last_tool_messages[0], input_messages[0]):
|
||||
logger.info(
|
||||
f"Idempotency check: Found matching tool return in full history (post-compaction). "
|
||||
f"tool_returns={last_tool_messages[0].tool_returns}, approval_response.approvals={input_messages[0].approvals}"
|
||||
)
|
||||
approval_already_processed = True
|
||||
|
||||
if approval_already_processed:
|
||||
# Approval already handled, just process follow-up messages if any or manually inject keep-alive message
|
||||
keep_alive_messages = input_messages[1:] or [
|
||||
|
||||
@@ -1445,3 +1445,116 @@ def test_approve_with_cancellation(
|
||||
assert len(messages) > 0, "Should have persisted new messages"
|
||||
assert messages[0].message_type == "user_message", "First message should be a user message"
|
||||
assert "keep-alive" in messages[0].content, f"Expected keep-alive message, got '{messages[0].content}'"
|
||||
|
||||
|
||||
def test_retry_with_summarization(
|
||||
client: Letta,
|
||||
agent: AgentState,
|
||||
) -> None:
|
||||
"""
|
||||
Test that approval retry works correctly after summarization evicts messages from context.
|
||||
|
||||
Scenario:
|
||||
1. Send message that triggers approval request
|
||||
2. Send approval response, but cancel during LLM processing
|
||||
3. Call summarize with mode='all' to evict all messages from context
|
||||
4. Verify only system and summary messages remain in context
|
||||
5. Retry the original approval response - should succeed via idempotency check
|
||||
"""
|
||||
import threading
|
||||
import time
|
||||
|
||||
# Step 1: Send message that triggers approval request
|
||||
response = client.agents.messages.create(
|
||||
agent_id=agent.id,
|
||||
messages=USER_MESSAGE_TEST_APPROVAL,
|
||||
)
|
||||
tool_call_id = response.messages[-1].tool_call.tool_call_id
|
||||
|
||||
# Step 2: Start cancellation in background thread
|
||||
def cancel_after_delay():
|
||||
time.sleep(0.3) # Wait for stream to start
|
||||
client.agents.messages.cancel(agent_id=agent.id)
|
||||
|
||||
cancel_thread = threading.Thread(target=cancel_after_delay, daemon=True)
|
||||
cancel_thread.start()
|
||||
|
||||
# Step 3: Start approval stream (will be cancelled during processing)
|
||||
response = client.agents.messages.stream(
|
||||
agent_id=agent.id,
|
||||
messages=[
|
||||
{
|
||||
"type": "approval",
|
||||
"approvals": [
|
||||
{
|
||||
"type": "tool",
|
||||
"tool_call_id": tool_call_id,
|
||||
"tool_return": SECRET_CODE,
|
||||
"status": "success",
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
streaming=True,
|
||||
stream_tokens=True,
|
||||
)
|
||||
|
||||
# Step 4: Accumulate chunks (stream will be cancelled)
|
||||
messages = accumulate_chunks(response)
|
||||
|
||||
# Step 5: Verify we got cancelled
|
||||
stop_reasons = [msg for msg in messages if hasattr(msg, "message_type") and msg.message_type == "stop_reason"]
|
||||
assert len(stop_reasons) == 1, f"Expected exactly 1 stop_reason, got {len(stop_reasons)}"
|
||||
assert stop_reasons[0].stop_reason == "cancelled", f"Expected stop_reason 'cancelled', got '{stop_reasons[0].stop_reason}'"
|
||||
|
||||
cancel_thread.join(timeout=1.0)
|
||||
|
||||
# Step 6: Verify tool return message is persisted
|
||||
all_messages = client.agents.messages.list(agent_id=agent.id, limit=100).items
|
||||
tool_return_messages = [m for m in all_messages if m.message_type == "tool_return_message"]
|
||||
assert len(tool_return_messages) > 0, "Tool return message should be persisted"
|
||||
|
||||
# Step 7: Call compact with mode='all' to evict all messages from context
|
||||
compaction_response = client.agents.messages.compact(
|
||||
agent_id=agent.id,
|
||||
compaction_settings={"mode": "all"},
|
||||
)
|
||||
|
||||
# Step 8: Verify only system and summary messages remain in context (should be 2)
|
||||
assert compaction_response.num_messages_after == 2, (
|
||||
f"Expected 2 messages (system + summary) after compaction, but got {compaction_response.num_messages_after}"
|
||||
)
|
||||
|
||||
logger.info(f"✅ After compaction: {compaction_response.num_messages_before} -> {compaction_response.num_messages_after} messages")
|
||||
|
||||
# Step 9: Retry the original approval response - should succeed via idempotency check
|
||||
response = client.agents.messages.stream(
|
||||
agent_id=agent.id,
|
||||
messages=[
|
||||
{
|
||||
"type": "approval",
|
||||
"approvals": [
|
||||
{
|
||||
"type": "tool",
|
||||
"tool_call_id": tool_call_id,
|
||||
"tool_return": SECRET_CODE,
|
||||
"status": "success",
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
streaming=True,
|
||||
stream_tokens=True,
|
||||
)
|
||||
|
||||
# Step 10: Accumulate chunks
|
||||
messages = accumulate_chunks(response)
|
||||
|
||||
# Step 11: Verify we got chunks AND an end_turn stop reason (not an error)
|
||||
assert len(messages) > 1, "Should receive at least some chunks"
|
||||
|
||||
stop_reasons = [msg for msg in messages if hasattr(msg, "message_type") and msg.message_type == "stop_reason"]
|
||||
assert len(stop_reasons) == 1, f"Expected exactly 1 stop_reason, got {len(stop_reasons)}"
|
||||
assert stop_reasons[0].stop_reason == "end_turn", f"Expected stop_reason 'end_turn', got '{stop_reasons[0].stop_reason}'"
|
||||
|
||||
logger.info("✅ Test passed: approval retry after summarization handled correctly via idempotency check")
|
||||
|
||||
Reference in New Issue
Block a user