feat: add credit verification before agent message endpoints [LET-XXXX] (#9433)
* feat: add credit verification before agent message endpoints
Add credit verification checks to message endpoints to prevent
execution when organizations have insufficient credits.
- Add InsufficientCreditsError exception type
- Add CreditVerificationService that calls step-orchestrator API
- Add credit checks to /agents/{id}/messages endpoints
- Add credit checks to /conversations/{id}/messages endpoint
🐾 Generated with [Letta Code](https://letta.com)
Co-Authored-By: Letta <noreply@letta.com>
* surface error in ade
* do per step instead
* parallel check
* parallel to step
* small fixes
* stage publish api
* fixes
* revert unnecessary frontend changes
* insufficient credits stop reason
---------
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -43877,6 +43877,7 @@
|
|||||||
"no_tool_call",
|
"no_tool_call",
|
||||||
"tool_rule",
|
"tool_rule",
|
||||||
"cancelled",
|
"cancelled",
|
||||||
|
"insufficient_credits",
|
||||||
"requires_approval",
|
"requires_approval",
|
||||||
"context_window_overflow_in_system_prompt"
|
"context_window_overflow_in_system_prompt"
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ from letta.agents.helpers import (
|
|||||||
generate_step_id,
|
generate_step_id,
|
||||||
)
|
)
|
||||||
from letta.constants import DEFAULT_MAX_STEPS, NON_USER_MSG_PREFIX, REQUEST_HEARTBEAT_PARAM
|
from letta.constants import DEFAULT_MAX_STEPS, NON_USER_MSG_PREFIX, REQUEST_HEARTBEAT_PARAM
|
||||||
from letta.errors import ContextWindowExceededError, LLMError
|
from letta.errors import ContextWindowExceededError, InsufficientCreditsError, LLMError
|
||||||
from letta.helpers import ToolRulesSolver
|
from letta.helpers import ToolRulesSolver
|
||||||
from letta.helpers.datetime_helpers import get_utc_time, get_utc_timestamp_ns, ns_to_ms
|
from letta.helpers.datetime_helpers import get_utc_time, get_utc_timestamp_ns, ns_to_ms
|
||||||
from letta.helpers.reasoning_helper import scrub_inner_thoughts_from_messages
|
from letta.helpers.reasoning_helper import scrub_inner_thoughts_from_messages
|
||||||
@@ -58,6 +58,7 @@ from letta.server.rest_api.utils import (
|
|||||||
from letta.services.agent_manager import AgentManager
|
from letta.services.agent_manager import AgentManager
|
||||||
from letta.services.archive_manager import ArchiveManager
|
from letta.services.archive_manager import ArchiveManager
|
||||||
from letta.services.block_manager import BlockManager
|
from letta.services.block_manager import BlockManager
|
||||||
|
from letta.services.credit_verification_service import CreditVerificationService
|
||||||
from letta.services.helpers.tool_parser_helper import runtime_override_tool_json_schema
|
from letta.services.helpers.tool_parser_helper import runtime_override_tool_json_schema
|
||||||
from letta.services.message_manager import MessageManager
|
from letta.services.message_manager import MessageManager
|
||||||
from letta.services.passage_manager import PassageManager
|
from letta.services.passage_manager import PassageManager
|
||||||
@@ -70,7 +71,7 @@ from letta.services.tool_executor.tool_execution_manager import ToolExecutionMan
|
|||||||
from letta.settings import model_settings, settings, summarizer_settings
|
from letta.settings import model_settings, settings, summarizer_settings
|
||||||
from letta.system import package_function_response
|
from letta.system import package_function_response
|
||||||
from letta.types import JsonDict
|
from letta.types import JsonDict
|
||||||
from letta.utils import log_telemetry, safe_create_task, united_diff, validate_function_response
|
from letta.utils import log_telemetry, safe_create_task, safe_create_task_with_return, united_diff, validate_function_response
|
||||||
|
|
||||||
|
|
||||||
class LettaAgentV2(BaseAgentV2):
|
class LettaAgentV2(BaseAgentV2):
|
||||||
@@ -106,6 +107,7 @@ class LettaAgentV2(BaseAgentV2):
|
|||||||
self.passage_manager = PassageManager()
|
self.passage_manager = PassageManager()
|
||||||
self.step_manager = StepManager()
|
self.step_manager = StepManager()
|
||||||
self.telemetry_manager = TelemetryManager()
|
self.telemetry_manager = TelemetryManager()
|
||||||
|
self.credit_verification_service = CreditVerificationService()
|
||||||
|
|
||||||
## TODO: Expand to more
|
## TODO: Expand to more
|
||||||
# if summarizer_settings.enable_summarization and model_settings.openai_api_key:
|
# if summarizer_settings.enable_summarization and model_settings.openai_api_key:
|
||||||
@@ -209,9 +211,18 @@ class LettaAgentV2(BaseAgentV2):
|
|||||||
)
|
)
|
||||||
in_context_messages = in_context_messages + input_messages_to_persist
|
in_context_messages = in_context_messages + input_messages_to_persist
|
||||||
response_letta_messages = []
|
response_letta_messages = []
|
||||||
|
credit_task = None
|
||||||
for i in range(max_steps):
|
for i in range(max_steps):
|
||||||
remaining_turns = max_steps - i - 1
|
remaining_turns = max_steps - i - 1
|
||||||
|
|
||||||
|
# Await credit check from previous iteration before running next step
|
||||||
|
if credit_task is not None:
|
||||||
|
if not await credit_task:
|
||||||
|
self.should_continue = False
|
||||||
|
self.stop_reason = LettaStopReason(stop_reason=StopReasonType.insufficient_credits)
|
||||||
|
break
|
||||||
|
credit_task = None
|
||||||
|
|
||||||
response = self._step(
|
response = self._step(
|
||||||
messages=in_context_messages + self.response_messages,
|
messages=in_context_messages + self.response_messages,
|
||||||
input_messages_to_persist=input_messages_to_persist,
|
input_messages_to_persist=input_messages_to_persist,
|
||||||
@@ -238,6 +249,9 @@ class LettaAgentV2(BaseAgentV2):
|
|||||||
if not self.should_continue:
|
if not self.should_continue:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Fire credit check to run in parallel with loop overhead / next step setup
|
||||||
|
credit_task = safe_create_task_with_return(self._check_credits())
|
||||||
|
|
||||||
input_messages_to_persist = []
|
input_messages_to_persist = []
|
||||||
|
|
||||||
# Rebuild context window after stepping
|
# Rebuild context window after stepping
|
||||||
@@ -332,7 +346,16 @@ class LettaAgentV2(BaseAgentV2):
|
|||||||
input_messages, self.agent_state, self.message_manager, self.actor, run_id
|
input_messages, self.agent_state, self.message_manager, self.actor, run_id
|
||||||
)
|
)
|
||||||
in_context_messages = in_context_messages + input_messages_to_persist
|
in_context_messages = in_context_messages + input_messages_to_persist
|
||||||
|
credit_task = None
|
||||||
for i in range(max_steps):
|
for i in range(max_steps):
|
||||||
|
# Await credit check from previous iteration before running next step
|
||||||
|
if credit_task is not None:
|
||||||
|
if not await credit_task:
|
||||||
|
self.should_continue = False
|
||||||
|
self.stop_reason = LettaStopReason(stop_reason=StopReasonType.insufficient_credits)
|
||||||
|
break
|
||||||
|
credit_task = None
|
||||||
|
|
||||||
response = self._step(
|
response = self._step(
|
||||||
messages=in_context_messages + self.response_messages,
|
messages=in_context_messages + self.response_messages,
|
||||||
input_messages_to_persist=input_messages_to_persist,
|
input_messages_to_persist=input_messages_to_persist,
|
||||||
@@ -351,6 +374,9 @@ class LettaAgentV2(BaseAgentV2):
|
|||||||
if not self.should_continue:
|
if not self.should_continue:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Fire credit check to run in parallel with loop overhead / next step setup
|
||||||
|
credit_task = safe_create_task_with_return(self._check_credits())
|
||||||
|
|
||||||
input_messages_to_persist = []
|
input_messages_to_persist = []
|
||||||
|
|
||||||
if self.stop_reason is None:
|
if self.stop_reason is None:
|
||||||
@@ -676,6 +702,15 @@ class LettaAgentV2(BaseAgentV2):
|
|||||||
self.last_function_response = None
|
self.last_function_response = None
|
||||||
self.response_messages = []
|
self.response_messages = []
|
||||||
|
|
||||||
|
async def _check_credits(self) -> bool:
|
||||||
|
"""Check if the organization still has credits. Returns True if OK or not configured."""
|
||||||
|
try:
|
||||||
|
await self.credit_verification_service.verify_credits(self.actor.organization_id)
|
||||||
|
return True
|
||||||
|
except InsufficientCreditsError:
|
||||||
|
self.logger.warning(f"Insufficient credits for organization {self.actor.organization_id}, stopping agent loop")
|
||||||
|
return False
|
||||||
|
|
||||||
@trace_method
|
@trace_method
|
||||||
async def _check_run_cancellation(self, run_id) -> bool:
|
async def _check_run_cancellation(self, run_id) -> bool:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import asyncio
|
|
||||||
import json
|
import json
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Any, AsyncGenerator, Dict, Literal, Optional
|
from typing import Any, AsyncGenerator, Dict, Literal, Optional
|
||||||
@@ -65,7 +64,7 @@ from letta.services.summarizer.summarizer_config import CompactionSettings
|
|||||||
from letta.services.summarizer.summarizer_sliding_window import count_tokens
|
from letta.services.summarizer.summarizer_sliding_window import count_tokens
|
||||||
from letta.settings import settings, summarizer_settings
|
from letta.settings import settings, summarizer_settings
|
||||||
from letta.system import package_function_response, package_summarize_message_no_counts
|
from letta.system import package_function_response, package_summarize_message_no_counts
|
||||||
from letta.utils import log_telemetry, validate_function_response
|
from letta.utils import log_telemetry, safe_create_task_with_return, validate_function_response
|
||||||
|
|
||||||
|
|
||||||
def extract_compaction_stats_from_message(message: Message) -> CompactionStats | None:
|
def extract_compaction_stats_from_message(message: Message) -> CompactionStats | None:
|
||||||
@@ -237,11 +236,20 @@ class LettaAgentV3(LettaAgentV2):
|
|||||||
user_id=self.actor.id,
|
user_id=self.actor.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
credit_task = None
|
||||||
for i in range(max_steps):
|
for i in range(max_steps):
|
||||||
if i == 1 and follow_up_messages:
|
if i == 1 and follow_up_messages:
|
||||||
input_messages_to_persist = follow_up_messages
|
input_messages_to_persist = follow_up_messages
|
||||||
follow_up_messages = []
|
follow_up_messages = []
|
||||||
|
|
||||||
|
# Await credit check from previous iteration before running next step
|
||||||
|
if credit_task is not None:
|
||||||
|
if not await credit_task:
|
||||||
|
self.should_continue = False
|
||||||
|
self.stop_reason = LettaStopReason(stop_reason=StopReasonType.insufficient_credits)
|
||||||
|
break
|
||||||
|
credit_task = None
|
||||||
|
|
||||||
response = self._step(
|
response = self._step(
|
||||||
# we append input_messages_to_persist since they aren't checkpointed as in-context until the end of the step (may be rolled back)
|
# we append input_messages_to_persist since they aren't checkpointed as in-context until the end of the step (may be rolled back)
|
||||||
messages=list(self.in_context_messages + input_messages_to_persist),
|
messages=list(self.in_context_messages + input_messages_to_persist),
|
||||||
@@ -289,6 +297,9 @@ class LettaAgentV3(LettaAgentV2):
|
|||||||
if not self.should_continue:
|
if not self.should_continue:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Fire credit check to run in parallel with loop overhead / next step setup
|
||||||
|
credit_task = safe_create_task_with_return(self._check_credits())
|
||||||
|
|
||||||
# input_messages_to_persist = []
|
# input_messages_to_persist = []
|
||||||
|
|
||||||
if i == max_steps - 1 and self.stop_reason is None:
|
if i == max_steps - 1 and self.stop_reason is None:
|
||||||
@@ -453,10 +464,20 @@ class LettaAgentV3(LettaAgentV2):
|
|||||||
input_messages_to_persist = [input_messages_to_persist[0]]
|
input_messages_to_persist = [input_messages_to_persist[0]]
|
||||||
|
|
||||||
self.in_context_messages = in_context_messages
|
self.in_context_messages = in_context_messages
|
||||||
|
credit_task = None
|
||||||
for i in range(max_steps):
|
for i in range(max_steps):
|
||||||
if i == 1 and follow_up_messages:
|
if i == 1 and follow_up_messages:
|
||||||
input_messages_to_persist = follow_up_messages
|
input_messages_to_persist = follow_up_messages
|
||||||
follow_up_messages = []
|
follow_up_messages = []
|
||||||
|
|
||||||
|
# Await credit check from previous iteration before running next step
|
||||||
|
if credit_task is not None:
|
||||||
|
if not await credit_task:
|
||||||
|
self.should_continue = False
|
||||||
|
self.stop_reason = LettaStopReason(stop_reason=StopReasonType.insufficient_credits)
|
||||||
|
break
|
||||||
|
credit_task = None
|
||||||
|
|
||||||
response = self._step(
|
response = self._step(
|
||||||
# we append input_messages_to_persist since they aren't checkpointed as in-context until the end of the step (may be rolled back)
|
# we append input_messages_to_persist since they aren't checkpointed as in-context until the end of the step (may be rolled back)
|
||||||
messages=list(self.in_context_messages + input_messages_to_persist),
|
messages=list(self.in_context_messages + input_messages_to_persist),
|
||||||
@@ -486,6 +507,9 @@ class LettaAgentV3(LettaAgentV2):
|
|||||||
if not self.should_continue:
|
if not self.should_continue:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Fire credit check to run in parallel with loop overhead / next step setup
|
||||||
|
credit_task = safe_create_task_with_return(self._check_credits())
|
||||||
|
|
||||||
if i == max_steps - 1 and self.stop_reason is None:
|
if i == max_steps - 1 and self.stop_reason is None:
|
||||||
self.stop_reason = LettaStopReason(stop_reason=StopReasonType.max_steps.value)
|
self.stop_reason = LettaStopReason(stop_reason=StopReasonType.max_steps.value)
|
||||||
|
|
||||||
|
|||||||
@@ -451,6 +451,16 @@ class AgentFileImportError(Exception):
|
|||||||
"""Exception raised during agent file import operations"""
|
"""Exception raised during agent file import operations"""
|
||||||
|
|
||||||
|
|
||||||
|
class InsufficientCreditsError(LettaError):
|
||||||
|
"""Raised when an organization has no remaining credits."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(
|
||||||
|
message="Insufficient credits to process this request.",
|
||||||
|
details={"error_code": "INSUFFICIENT_CREDITS"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class RunCancelError(LettaError):
|
class RunCancelError(LettaError):
|
||||||
"""Error raised when a run cannot be cancelled."""
|
"""Error raised when a run cannot be cancelled."""
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ class StopReasonType(str, Enum):
|
|||||||
no_tool_call = "no_tool_call"
|
no_tool_call = "no_tool_call"
|
||||||
tool_rule = "tool_rule"
|
tool_rule = "tool_rule"
|
||||||
cancelled = "cancelled"
|
cancelled = "cancelled"
|
||||||
|
insufficient_credits = "insufficient_credits"
|
||||||
requires_approval = "requires_approval"
|
requires_approval = "requires_approval"
|
||||||
context_window_overflow_in_system_prompt = "context_window_overflow_in_system_prompt"
|
context_window_overflow_in_system_prompt = "context_window_overflow_in_system_prompt"
|
||||||
|
|
||||||
@@ -42,6 +43,8 @@ class StopReasonType(str, Enum):
|
|||||||
return RunStatus.failed
|
return RunStatus.failed
|
||||||
elif self == StopReasonType.cancelled:
|
elif self == StopReasonType.cancelled:
|
||||||
return RunStatus.cancelled
|
return RunStatus.cancelled
|
||||||
|
elif self == StopReasonType.insufficient_credits:
|
||||||
|
return RunStatus.failed
|
||||||
else:
|
else:
|
||||||
raise ValueError("Unknown StopReasonType")
|
raise ValueError("Unknown StopReasonType")
|
||||||
|
|
||||||
|
|||||||
56
letta/services/credit_verification_service.py
Normal file
56
letta/services/credit_verification_service.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from letta.errors import InsufficientCreditsError
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class CreditVerificationService:
|
||||||
|
"""Service for verifying organization credit balance before agent execution."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.endpoint = os.getenv("STEP_ORCHESTRATOR_ENDPOINT")
|
||||||
|
self.auth_key = os.getenv("STEP_COMPLETE_KEY")
|
||||||
|
|
||||||
|
async def verify_credits(self, organization_id: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if an organization has enough credits to proceed.
|
||||||
|
|
||||||
|
Returns True if credits are available or if the service is not configured.
|
||||||
|
Raises InsufficientCreditsError if no credits remain.
|
||||||
|
"""
|
||||||
|
if not self.endpoint or not self.auth_key:
|
||||||
|
return True
|
||||||
|
|
||||||
|
try:
|
||||||
|
headers = {}
|
||||||
|
if self.auth_key:
|
||||||
|
headers["Authorization"] = f"Bearer {self.auth_key}"
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||||
|
response = await client.get(
|
||||||
|
f"{self.endpoint}/validate/core-organizations/{organization_id}",
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
if not data.get("hasMoreCredits", True):
|
||||||
|
raise InsufficientCreditsError()
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except InsufficientCreditsError:
|
||||||
|
raise
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
logger.warning(f"Timeout verifying credits for organization {organization_id}")
|
||||||
|
return True
|
||||||
|
except httpx.HTTPStatusError as e:
|
||||||
|
logger.warning(f"HTTP error verifying credits for organization {organization_id}: {e.response.status_code}")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error verifying credits for organization {organization_id}: {e}")
|
||||||
|
return True
|
||||||
Reference in New Issue
Block a user