From 0a8a8fda54b82c910ab4e3f5bca07b9e8cde599f Mon Sep 17 00:00:00 2001 From: Ari Webb Date: Fri, 13 Feb 2026 11:54:31 -0800 Subject: [PATCH] feat: add credit verification before agent message endpoints [LET-XXXX] (#9433) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add credit verification before agent message endpoints Add credit verification checks to message endpoints to prevent execution when organizations have insufficient credits. - Add InsufficientCreditsError exception type - Add CreditVerificationService that calls step-orchestrator API - Add credit checks to /agents/{id}/messages endpoints - Add credit checks to /conversations/{id}/messages endpoint 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * surface error in ade * do per step instead * parallel check * parallel to step * small fixes * stage publish api * fixes * revert unnecessary frontend changes * insufficient credits stop reason --------- Co-authored-by: Letta --- fern/openapi.json | 1 + letta/agents/letta_agent_v2.py | 39 ++++++++++++- letta/agents/letta_agent_v3.py | 28 +++++++++- letta/errors.py | 10 ++++ letta/schemas/letta_stop_reason.py | 3 + letta/services/credit_verification_service.py | 56 +++++++++++++++++++ 6 files changed, 133 insertions(+), 4 deletions(-) create mode 100644 letta/services/credit_verification_service.py diff --git a/fern/openapi.json b/fern/openapi.json index 51602e95..2d3d3831 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -43877,6 +43877,7 @@ "no_tool_call", "tool_rule", "cancelled", + "insufficient_credits", "requires_approval", "context_window_overflow_in_system_prompt" ], diff --git a/letta/agents/letta_agent_v2.py b/letta/agents/letta_agent_v2.py index 2c0bf6ca..02ed2e46 100644 --- a/letta/agents/letta_agent_v2.py +++ b/letta/agents/letta_agent_v2.py @@ -20,7 +20,7 @@ from letta.agents.helpers import ( generate_step_id, ) from letta.constants import DEFAULT_MAX_STEPS, NON_USER_MSG_PREFIX, REQUEST_HEARTBEAT_PARAM -from letta.errors import ContextWindowExceededError, LLMError +from letta.errors import ContextWindowExceededError, InsufficientCreditsError, LLMError from letta.helpers import ToolRulesSolver from letta.helpers.datetime_helpers import get_utc_time, get_utc_timestamp_ns, ns_to_ms from letta.helpers.reasoning_helper import scrub_inner_thoughts_from_messages @@ -58,6 +58,7 @@ from letta.server.rest_api.utils import ( from letta.services.agent_manager import AgentManager from letta.services.archive_manager import ArchiveManager from letta.services.block_manager import BlockManager +from letta.services.credit_verification_service import CreditVerificationService from letta.services.helpers.tool_parser_helper import runtime_override_tool_json_schema from letta.services.message_manager import MessageManager from letta.services.passage_manager import PassageManager @@ -70,7 +71,7 @@ from letta.services.tool_executor.tool_execution_manager import ToolExecutionMan from letta.settings import model_settings, settings, summarizer_settings from letta.system import package_function_response from letta.types import JsonDict -from letta.utils import log_telemetry, safe_create_task, united_diff, validate_function_response +from letta.utils import log_telemetry, safe_create_task, safe_create_task_with_return, united_diff, validate_function_response class LettaAgentV2(BaseAgentV2): @@ -106,6 +107,7 @@ class LettaAgentV2(BaseAgentV2): self.passage_manager = PassageManager() self.step_manager = StepManager() self.telemetry_manager = TelemetryManager() + self.credit_verification_service = CreditVerificationService() ## TODO: Expand to more # if summarizer_settings.enable_summarization and model_settings.openai_api_key: @@ -209,9 +211,18 @@ class LettaAgentV2(BaseAgentV2): ) in_context_messages = in_context_messages + input_messages_to_persist response_letta_messages = [] + credit_task = None for i in range(max_steps): remaining_turns = max_steps - i - 1 + # Await credit check from previous iteration before running next step + if credit_task is not None: + if not await credit_task: + self.should_continue = False + self.stop_reason = LettaStopReason(stop_reason=StopReasonType.insufficient_credits) + break + credit_task = None + response = self._step( messages=in_context_messages + self.response_messages, input_messages_to_persist=input_messages_to_persist, @@ -238,6 +249,9 @@ class LettaAgentV2(BaseAgentV2): if not self.should_continue: break + # Fire credit check to run in parallel with loop overhead / next step setup + credit_task = safe_create_task_with_return(self._check_credits()) + input_messages_to_persist = [] # Rebuild context window after stepping @@ -332,7 +346,16 @@ class LettaAgentV2(BaseAgentV2): input_messages, self.agent_state, self.message_manager, self.actor, run_id ) in_context_messages = in_context_messages + input_messages_to_persist + credit_task = None for i in range(max_steps): + # Await credit check from previous iteration before running next step + if credit_task is not None: + if not await credit_task: + self.should_continue = False + self.stop_reason = LettaStopReason(stop_reason=StopReasonType.insufficient_credits) + break + credit_task = None + response = self._step( messages=in_context_messages + self.response_messages, input_messages_to_persist=input_messages_to_persist, @@ -351,6 +374,9 @@ class LettaAgentV2(BaseAgentV2): if not self.should_continue: break + # Fire credit check to run in parallel with loop overhead / next step setup + credit_task = safe_create_task_with_return(self._check_credits()) + input_messages_to_persist = [] if self.stop_reason is None: @@ -676,6 +702,15 @@ class LettaAgentV2(BaseAgentV2): self.last_function_response = None self.response_messages = [] + async def _check_credits(self) -> bool: + """Check if the organization still has credits. Returns True if OK or not configured.""" + try: + await self.credit_verification_service.verify_credits(self.actor.organization_id) + return True + except InsufficientCreditsError: + self.logger.warning(f"Insufficient credits for organization {self.actor.organization_id}, stopping agent loop") + return False + @trace_method async def _check_run_cancellation(self, run_id) -> bool: try: diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index db1a7df5..480c9e74 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -1,4 +1,3 @@ -import asyncio import json import uuid from typing import Any, AsyncGenerator, Dict, Literal, Optional @@ -65,7 +64,7 @@ from letta.services.summarizer.summarizer_config import CompactionSettings from letta.services.summarizer.summarizer_sliding_window import count_tokens from letta.settings import settings, summarizer_settings from letta.system import package_function_response, package_summarize_message_no_counts -from letta.utils import log_telemetry, validate_function_response +from letta.utils import log_telemetry, safe_create_task_with_return, validate_function_response def extract_compaction_stats_from_message(message: Message) -> CompactionStats | None: @@ -237,11 +236,20 @@ class LettaAgentV3(LettaAgentV2): user_id=self.actor.id, ) + credit_task = None for i in range(max_steps): if i == 1 and follow_up_messages: input_messages_to_persist = follow_up_messages follow_up_messages = [] + # Await credit check from previous iteration before running next step + if credit_task is not None: + if not await credit_task: + self.should_continue = False + self.stop_reason = LettaStopReason(stop_reason=StopReasonType.insufficient_credits) + break + credit_task = None + response = self._step( # we append input_messages_to_persist since they aren't checkpointed as in-context until the end of the step (may be rolled back) messages=list(self.in_context_messages + input_messages_to_persist), @@ -289,6 +297,9 @@ class LettaAgentV3(LettaAgentV2): if not self.should_continue: break + # Fire credit check to run in parallel with loop overhead / next step setup + credit_task = safe_create_task_with_return(self._check_credits()) + # input_messages_to_persist = [] if i == max_steps - 1 and self.stop_reason is None: @@ -453,10 +464,20 @@ class LettaAgentV3(LettaAgentV2): input_messages_to_persist = [input_messages_to_persist[0]] self.in_context_messages = in_context_messages + credit_task = None for i in range(max_steps): if i == 1 and follow_up_messages: input_messages_to_persist = follow_up_messages follow_up_messages = [] + + # Await credit check from previous iteration before running next step + if credit_task is not None: + if not await credit_task: + self.should_continue = False + self.stop_reason = LettaStopReason(stop_reason=StopReasonType.insufficient_credits) + break + credit_task = None + response = self._step( # we append input_messages_to_persist since they aren't checkpointed as in-context until the end of the step (may be rolled back) messages=list(self.in_context_messages + input_messages_to_persist), @@ -486,6 +507,9 @@ class LettaAgentV3(LettaAgentV2): if not self.should_continue: break + # Fire credit check to run in parallel with loop overhead / next step setup + credit_task = safe_create_task_with_return(self._check_credits()) + if i == max_steps - 1 and self.stop_reason is None: self.stop_reason = LettaStopReason(stop_reason=StopReasonType.max_steps.value) diff --git a/letta/errors.py b/letta/errors.py index ccf98125..5795ef69 100644 --- a/letta/errors.py +++ b/letta/errors.py @@ -451,6 +451,16 @@ class AgentFileImportError(Exception): """Exception raised during agent file import operations""" +class InsufficientCreditsError(LettaError): + """Raised when an organization has no remaining credits.""" + + def __init__(self): + super().__init__( + message="Insufficient credits to process this request.", + details={"error_code": "INSUFFICIENT_CREDITS"}, + ) + + class RunCancelError(LettaError): """Error raised when a run cannot be cancelled.""" diff --git a/letta/schemas/letta_stop_reason.py b/letta/schemas/letta_stop_reason.py index abb28aa7..81c8ac3e 100644 --- a/letta/schemas/letta_stop_reason.py +++ b/letta/schemas/letta_stop_reason.py @@ -17,6 +17,7 @@ class StopReasonType(str, Enum): no_tool_call = "no_tool_call" tool_rule = "tool_rule" cancelled = "cancelled" + insufficient_credits = "insufficient_credits" requires_approval = "requires_approval" context_window_overflow_in_system_prompt = "context_window_overflow_in_system_prompt" @@ -42,6 +43,8 @@ class StopReasonType(str, Enum): return RunStatus.failed elif self == StopReasonType.cancelled: return RunStatus.cancelled + elif self == StopReasonType.insufficient_credits: + return RunStatus.failed else: raise ValueError("Unknown StopReasonType") diff --git a/letta/services/credit_verification_service.py b/letta/services/credit_verification_service.py new file mode 100644 index 00000000..cf4977e7 --- /dev/null +++ b/letta/services/credit_verification_service.py @@ -0,0 +1,56 @@ +import logging +import os + +import httpx + +from letta.errors import InsufficientCreditsError + +logger = logging.getLogger(__name__) + + +class CreditVerificationService: + """Service for verifying organization credit balance before agent execution.""" + + def __init__(self): + self.endpoint = os.getenv("STEP_ORCHESTRATOR_ENDPOINT") + self.auth_key = os.getenv("STEP_COMPLETE_KEY") + + async def verify_credits(self, organization_id: str) -> bool: + """ + Check if an organization has enough credits to proceed. + + Returns True if credits are available or if the service is not configured. + Raises InsufficientCreditsError if no credits remain. + """ + if not self.endpoint or not self.auth_key: + return True + + try: + headers = {} + if self.auth_key: + headers["Authorization"] = f"Bearer {self.auth_key}" + + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.get( + f"{self.endpoint}/validate/core-organizations/{organization_id}", + headers=headers, + ) + response.raise_for_status() + + data = response.json() + if not data.get("hasMoreCredits", True): + raise InsufficientCreditsError() + + return True + + except InsufficientCreditsError: + raise + except httpx.TimeoutException: + logger.warning(f"Timeout verifying credits for organization {organization_id}") + return True + except httpx.HTTPStatusError as e: + logger.warning(f"HTTP error verifying credits for organization {organization_id}: {e.response.status_code}") + return True + except Exception as e: + logger.error(f"Unexpected error verifying credits for organization {organization_id}: {e}") + return True