feat: Create polling job for polling batch results (#1624)

Previous run passed all relevant checks, so skipping the wait. This new commit just merges main.
This commit is contained in:
Matthew Zhou
2025-04-08 16:42:12 -07:00
committed by GitHub
parent a9c6537bf2
commit 5fe18ec0e9
14 changed files with 772 additions and 63 deletions

0
letta/jobs/__init__.py Normal file
View File

25
letta/jobs/helpers.py Normal file
View File

@@ -0,0 +1,25 @@
from anthropic.types.beta.messages import (
BetaMessageBatchCanceledResult,
BetaMessageBatchIndividualResponse,
BetaMessageBatchSucceededResult,
)
from letta.schemas.enums import JobStatus
def map_anthropic_batch_job_status_to_job_status(anthropic_status: str) -> JobStatus:
mapping = {
"in_progress": JobStatus.running,
"canceling": JobStatus.cancelled,
"ended": JobStatus.completed,
}
return mapping.get(anthropic_status, JobStatus.pending) # fallback just in case
def map_anthropic_individual_batch_item_status_to_job_status(individual_item: BetaMessageBatchIndividualResponse) -> JobStatus:
if isinstance(individual_item.result, BetaMessageBatchSucceededResult):
return JobStatus.completed
elif isinstance(individual_item.result, BetaMessageBatchCanceledResult):
return JobStatus.cancelled
else:
return JobStatus.failed

View File

@@ -0,0 +1,204 @@
import asyncio
import datetime
from typing import List
from letta.jobs.helpers import map_anthropic_batch_job_status_to_job_status, map_anthropic_individual_batch_item_status_to_job_status
from letta.jobs.types import BatchId, BatchPollingResult, ItemUpdateInfo
from letta.log import get_logger
from letta.schemas.enums import JobStatus, ProviderType
from letta.schemas.llm_batch_job import LLMBatchJob
from letta.server.server import SyncServer
logger = get_logger(__name__)
class BatchPollingMetrics:
"""Class to track metrics for batch polling operations."""
def __init__(self):
self.start_time = datetime.datetime.now()
self.total_batches = 0
self.anthropic_batches = 0
self.running_count = 0
self.completed_count = 0
self.updated_items_count = 0
def log_summary(self):
"""Log a summary of the metrics collected during polling."""
elapsed = (datetime.datetime.now() - self.start_time).total_seconds()
logger.info(f"[Poll BatchJob] Finished poll_running_llm_batches job in {elapsed:.2f}s")
logger.info(f"[Poll BatchJob] Found {self.total_batches} running batches total.")
logger.info(f"[Poll BatchJob] Found {self.anthropic_batches} Anthropic batch(es) to poll.")
logger.info(f"[Poll BatchJob] Final results: {self.completed_count} completed, {self.running_count} still running.")
logger.info(f"[Poll BatchJob] Updated {self.updated_items_count} items for newly completed batch(es).")
async def fetch_batch_status(server: SyncServer, batch_job: LLMBatchJob) -> BatchPollingResult:
"""
Fetch the current status of a single batch job from the provider.
Args:
server: The SyncServer instance
batch_job: The batch job to check status for
Returns:
A tuple containing (batch_id, new_status, polling_response)
"""
batch_id_str = batch_job.create_batch_response.id
try:
response = await server.anthropic_async_client.beta.messages.batches.retrieve(batch_id_str)
new_status = map_anthropic_batch_job_status_to_job_status(response.processing_status)
logger.debug(f"[Poll BatchJob] Batch {batch_job.id}: provider={response.processing_status} → internal={new_status}")
return (batch_job.id, new_status, response)
except Exception as e:
logger.warning(f"[Poll BatchJob] Batch {batch_job.id}: failed to retrieve {batch_id_str}: {e}")
# We treat a retrieval error as still running to try again next cycle
return (batch_job.id, JobStatus.running, None)
async def fetch_batch_items(server: SyncServer, batch_id: BatchId, batch_resp_id: str) -> List[ItemUpdateInfo]:
"""
Fetch individual item results for a completed batch.
Args:
server: The SyncServer instance
batch_id: The internal batch ID
batch_resp_id: The provider's batch response ID
Returns:
A list of item update information tuples
"""
updates = []
try:
async for item_result in server.anthropic_async_client.beta.messages.batches.results(batch_resp_id):
# Here, custom_id should be the agent_id
item_status = map_anthropic_individual_batch_item_status_to_job_status(item_result)
updates.append((batch_id, item_result.custom_id, item_status, item_result))
logger.info(f"[Poll BatchJob] Fetched {len(updates)} item updates for batch {batch_id}.")
except Exception as e:
logger.error(f"[Poll BatchJob] Error fetching item updates for batch {batch_id}: {e}")
return updates
async def poll_batch_updates(server: SyncServer, batch_jobs: List[LLMBatchJob], metrics: BatchPollingMetrics) -> List[BatchPollingResult]:
"""
Poll for updates to multiple batch jobs concurrently.
Args:
server: The SyncServer instance
batch_jobs: List of batch jobs to poll
metrics: Metrics collection object
Returns:
List of batch polling results
"""
if not batch_jobs:
logger.info("[Poll BatchJob] No Anthropic batches to update; job complete.")
return []
# Create polling tasks for all batch jobs
coros = [fetch_batch_status(server, b) for b in batch_jobs]
results: List[BatchPollingResult] = await asyncio.gather(*coros)
# Update the server with batch status changes
server.batch_manager.bulk_update_batch_statuses(updates=results)
logger.info(f"[Poll BatchJob] Bulk-updated {len(results)} LLM batch(es) in the DB at job level.")
return results
async def process_completed_batches(
server: SyncServer, batch_results: List[BatchPollingResult], metrics: BatchPollingMetrics
) -> List[ItemUpdateInfo]:
"""
Process batches that have completed and fetch their item results.
Args:
server: The SyncServer instance
batch_results: Results from polling batch statuses
metrics: Metrics collection object
Returns:
List of item updates to apply
"""
item_update_tasks = []
# Process each top-level polling result
for batch_id, new_status, maybe_batch_resp in batch_results:
if not maybe_batch_resp:
if new_status == JobStatus.running:
metrics.running_count += 1
logger.warning(f"[Poll BatchJob] Batch {batch_id}: JobStatus was {new_status} and no batch response was found.")
continue
if new_status == JobStatus.completed:
metrics.completed_count += 1
batch_resp_id = maybe_batch_resp.id # The Anthropic-assigned batch ID
# Queue an async call to fetch item results for this batch
item_update_tasks.append(fetch_batch_items(server, batch_id, batch_resp_id))
elif new_status == JobStatus.running:
metrics.running_count += 1
# Launch all item update tasks concurrently
concurrent_results = await asyncio.gather(*item_update_tasks, return_exceptions=True)
# Flatten and filter the results
item_updates = []
for result in concurrent_results:
if isinstance(result, Exception):
logger.error(f"[Poll BatchJob] A fetch_batch_items task failed with: {result}")
elif isinstance(result, list):
item_updates.extend(result)
logger.info(f"[Poll BatchJob] Collected a total of {len(item_updates)} item update(s) from completed batches.")
return item_updates
async def poll_running_llm_batches(server: "SyncServer") -> None:
"""
Cron job to poll all running LLM batch jobs and update their polling responses in bulk.
Steps:
1. Fetch currently running batch jobs
2. Filter Anthropic only
3. Retrieve updated top-level polling info concurrently
4. Bulk update LLMBatchJob statuses
5. For each completed batch, call .results(...) to get item-level results
6. Bulk update all matching LLMBatchItem records by (batch_id, agent_id)
7. Log telemetry about success/fail
"""
# Initialize metrics tracking
metrics = BatchPollingMetrics()
logger.info("[Poll BatchJob] Starting poll_running_llm_batches job")
try:
# 1. Retrieve running batch jobs
batches = server.batch_manager.list_running_batches()
metrics.total_batches = len(batches)
# TODO: Expand to more providers
# 2. Filter for Anthropic jobs only
anthropic_batch_jobs = [b for b in batches if b.llm_provider == ProviderType.anthropic]
metrics.anthropic_batches = len(anthropic_batch_jobs)
# 3-4. Poll for batch updates and bulk update statuses
batch_results = await poll_batch_updates(server, anthropic_batch_jobs, metrics)
# 5. Process completed batches and fetch item results
item_updates = await process_completed_batches(server, batch_results, metrics)
# 6. Bulk update all items for newly completed batch(es)
if item_updates:
metrics.updated_items_count = len(item_updates)
server.batch_manager.bulk_update_batch_items_by_agent(item_updates)
else:
logger.info("[Poll BatchJob] No item-level updates needed.")
except Exception as e:
logger.exception("[Poll BatchJob] Unhandled error in poll_running_llm_batches", exc_info=e)
finally:
# 7. Log metrics summary
metrics.log_summary()

28
letta/jobs/scheduler.py Normal file
View File

@@ -0,0 +1,28 @@
import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from letta.jobs.llm_batch_job_polling import poll_running_llm_batches
from letta.server.server import SyncServer
from letta.settings import settings
scheduler = AsyncIOScheduler()
def start_cron_jobs(server: SyncServer):
"""Initialize cron jobs"""
scheduler.add_job(
poll_running_llm_batches,
args=[server],
trigger=IntervalTrigger(seconds=settings.poll_running_llm_batches_interval_seconds),
next_run_time=datetime.datetime.now(datetime.UTC),
id="poll_llm_batches",
name="Poll LLM API batch jobs and update status",
replace_existing=True,
)
scheduler.start()
def shutdown_cron_scheduler():
scheduler.shutdown()

10
letta/jobs/types.py Normal file
View File

@@ -0,0 +1,10 @@
from typing import Optional, Tuple
from anthropic.types.beta.messages import BetaMessageBatch, BetaMessageBatchIndividualResponse
from letta.schemas.enums import JobStatus
BatchId = str
AgentId = str
BatchPollingResult = Tuple[BatchId, JobStatus, Optional[BetaMessageBatch]]
ItemUpdateInfo = Tuple[BatchId, AgentId, JobStatus, BetaMessageBatchIndividualResponse]

View File

@@ -32,6 +32,7 @@ class JobStatus(str, Enum):
completed = "completed"
failed = "failed"
pending = "pending"
cancelled = "cancelled"
class AgentStepStatus(str, Enum):

View File

@@ -16,6 +16,7 @@ from starlette.middleware.cors import CORSMiddleware
from letta.__init__ import __version__
from letta.constants import ADMIN_PREFIX, API_PREFIX, OPENAI_API_PREFIX
from letta.errors import BedrockPermissionError, LettaAgentNotFoundError, LettaUserNotFoundError
from letta.jobs.scheduler import shutdown_cron_scheduler, start_cron_jobs
from letta.log import get_logger
from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError
from letta.schemas.letta_message import create_letta_message_union_schema
@@ -144,6 +145,12 @@ def create_application() -> "FastAPI":
executor = concurrent.futures.ThreadPoolExecutor(max_workers=settings.event_loop_threadpool_max_workers)
loop.set_default_executor(executor)
@app.on_event("startup")
def on_startup():
global server
start_cron_jobs(server)
@app.on_event("shutdown")
def shutdown_mcp_clients():
global server
@@ -159,6 +166,10 @@ def create_application() -> "FastAPI":
t.start()
t.join()
@app.on_event("shutdown")
def shutdown_scheduler():
shutdown_cron_scheduler()
@app.exception_handler(Exception)
async def generic_error_handler(request: Request, exc: Exception):
# Log the actual error for debugging

View File

@@ -8,6 +8,7 @@ from abc import abstractmethod
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from anthropic import AsyncAnthropic
from composio.client import Composio
from composio.client.collections import ActionModel, AppModel
from fastapi import HTTPException
@@ -352,6 +353,9 @@ class SyncServer(Server):
self._llm_config_cache = {}
self._embedding_config_cache = {}
# TODO: Replace this with the Anthropic client we have in house
self.anthropic_async_client = AsyncAnthropic()
def load_agent(self, agent_id: str, actor: User, interface: Union[AgentInterface, None] = None) -> Agent:
"""Updated method to load agents from persisted storage"""
agent_state = self.agent_manager.get_agent_by_id(agent_id=agent_id, actor=actor)

View File

@@ -1,13 +1,15 @@
import datetime
from typing import Optional
from typing import List, Optional
from anthropic.types.beta.messages import BetaMessageBatch, BetaMessageBatchIndividualResponse
from sqlalchemy import tuple_
from letta.jobs.types import BatchPollingResult, ItemUpdateInfo
from letta.log import get_logger
from letta.orm.llm_batch_items import LLMBatchItem
from letta.orm.llm_batch_job import LLMBatchJob
from letta.schemas.agent import AgentStepState
from letta.schemas.enums import AgentStepStatus, JobStatus
from letta.schemas.enums import AgentStepStatus, JobStatus, ProviderType
from letta.schemas.llm_batch_job import LLMBatchItem as PydanticLLMBatchItem
from letta.schemas.llm_batch_job import LLMBatchJob as PydanticLLMBatchJob
from letta.schemas.llm_config import LLMConfig
@@ -28,7 +30,7 @@ class LLMBatchManager:
@enforce_types
def create_batch_request(
self,
llm_provider: str,
llm_provider: ProviderType,
create_batch_response: BetaMessageBatch,
actor: PydanticUser,
status: JobStatus = JobStatus.created,
@@ -45,7 +47,7 @@ class LLMBatchManager:
return batch.to_pydantic()
@enforce_types
def get_batch_request_by_id(self, batch_id: str, actor: PydanticUser) -> PydanticLLMBatchJob:
def get_batch_job_by_id(self, batch_id: str, actor: Optional[PydanticUser] = None) -> PydanticLLMBatchJob:
"""Retrieve a single batch job by ID."""
with self.session_maker() as session:
batch = LLMBatchJob.read(db_session=session, identifier=batch_id, actor=actor)
@@ -56,7 +58,7 @@ class LLMBatchManager:
self,
batch_id: str,
status: JobStatus,
actor: PydanticUser,
actor: Optional[PydanticUser] = None,
latest_polling_response: Optional[BetaMessageBatch] = None,
) -> PydanticLLMBatchJob:
"""Update a batch jobs status and optionally its polling response."""
@@ -65,7 +67,34 @@ class LLMBatchManager:
batch.status = status
batch.latest_polling_response = latest_polling_response
batch.last_polled_at = datetime.datetime.now(datetime.timezone.utc)
return batch.update(db_session=session, actor=actor).to_pydantic()
batch = batch.update(db_session=session, actor=actor)
return batch.to_pydantic()
def bulk_update_batch_statuses(
self,
updates: List[BatchPollingResult],
) -> None:
"""
Efficiently update many LLMBatchJob rows. This is used by the cron jobs.
`updates` = [(batch_id, new_status, polling_response_or_None), …]
"""
now = datetime.datetime.now(datetime.timezone.utc)
with self.session_maker() as session:
mappings = []
for batch_id, status, response in updates:
mappings.append(
{
"id": batch_id,
"status": status,
"latest_polling_response": response,
"last_polled_at": now,
}
)
session.bulk_update_mappings(LLMBatchJob, mappings)
session.commit()
@enforce_types
def delete_batch_request(self, batch_id: str, actor: PydanticUser) -> None:
@@ -74,6 +103,18 @@ class LLMBatchManager:
batch = LLMBatchJob.read(db_session=session, identifier=batch_id, actor=actor)
batch.hard_delete(db_session=session, actor=actor)
@enforce_types
def list_running_batches(self, actor: Optional[PydanticUser] = None) -> List[PydanticLLMBatchJob]:
"""Return all running LLM batch jobs, optionally filtered by actor's organization."""
with self.session_maker() as session:
query = session.query(LLMBatchJob).filter(LLMBatchJob.status == JobStatus.running)
if actor is not None:
query = query.filter(LLMBatchJob.organization_id == actor.organization_id)
results = query.all()
return [batch.to_pydantic() for batch in results]
@enforce_types
def create_batch_item(
self,
@@ -131,6 +172,56 @@ class LLMBatchManager:
return item.update(db_session=session, actor=actor).to_pydantic()
def bulk_update_batch_items_by_agent(
self,
updates: List[ItemUpdateInfo],
) -> None:
"""
Efficiently update LLMBatchItem rows by (batch_id, agent_id).
Args:
updates: List of tuples:
(batch_id, agent_id, new_request_status, batch_request_result)
"""
with self.session_maker() as session:
# For bulk_update_mappings, we need the primary key of each row
# So we must map (batch_id, agent_id) → actual PK (id)
# We'll do it in one DB query using the (batch_id, agent_id) sets
# 1. Gather the pairs
key_pairs = [(b_id, a_id) for (b_id, a_id, *_rest) in updates]
# 2. Query items in a single step
items = (
session.query(LLMBatchItem.id, LLMBatchItem.batch_id, LLMBatchItem.agent_id)
.filter(tuple_(LLMBatchItem.batch_id, LLMBatchItem.agent_id).in_(key_pairs))
.all()
)
# Build a map from (batch_id, agent_id) → PK id
pair_to_pk = {}
for row_id, row_batch_id, row_agent_id in items:
pair_to_pk[(row_batch_id, row_agent_id)] = row_id
# 3. Construct mappings for the PK-based bulk update
mappings = []
for batch_id, agent_id, new_status, new_result in updates:
pk_id = pair_to_pk.get((batch_id, agent_id))
if not pk_id:
# Nonexistent or mismatch → skip
continue
mappings.append(
{
"id": pk_id,
"request_status": new_status,
"batch_request_result": new_result,
}
)
if mappings:
session.bulk_update_mappings(LLMBatchItem, mappings)
session.commit()
@enforce_types
def delete_batch_item(self, item_id: str, actor: PydanticUser) -> None:
"""Hard delete a batch item by ID."""

View File

@@ -203,6 +203,9 @@ class Settings(BaseSettings):
httpx_max_keepalive_connections: int = 500
httpx_keepalive_expiry: float = 120.0
# cron job parameters
poll_running_llm_batches_interval_seconds: int = 5 * 60
@property
def letta_pg_uri(self) -> str:
if self.pg_uri:

98
poetry.lock generated
View File

@@ -215,6 +215,33 @@ files = [
{file = "appnope-0.1.4.tar.gz", hash = "sha256:1de3860566df9caf38f01f86f65e0e13e379af54f9e4bee1e66b48f2efffd1ee"},
]
[[package]]
name = "apscheduler"
version = "3.11.0"
description = "In-process task scheduler with Cron-like capabilities"
optional = false
python-versions = ">=3.8"
files = [
{file = "APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da"},
{file = "apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133"},
]
[package.dependencies]
tzlocal = ">=3.0"
[package.extras]
doc = ["packaging", "sphinx", "sphinx-rtd-theme (>=1.3.0)"]
etcd = ["etcd3", "protobuf (<=3.21.0)"]
gevent = ["gevent"]
mongodb = ["pymongo (>=3.0)"]
redis = ["redis (>=3.0)"]
rethinkdb = ["rethinkdb (>=2.4.0)"]
sqlalchemy = ["sqlalchemy (>=1.4)"]
test = ["APScheduler[etcd,mongodb,redis,rethinkdb,sqlalchemy,tornado,zookeeper]", "PySide6", "anyio (>=4.5.2)", "gevent", "pytest", "pytz", "twisted"]
tornado = ["tornado (>=4.3)"]
twisted = ["twisted"]
zookeeper = ["kazoo"]
[[package]]
name = "argcomplete"
version = "3.6.2"
@@ -521,10 +548,6 @@ files = [
{file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:a37b8f0391212d29b3a91a799c8e4a2855e0576911cdfb2515487e30e322253d"},
{file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:e84799f09591700a4154154cab9787452925578841a94321d5ee8fb9a9a328f0"},
{file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:f66b5337fa213f1da0d9000bc8dc0cb5b896b726eefd9c6046f699b169c41b9e"},
{file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:5dab0844f2cf82be357a0eb11a9087f70c5430b2c241493fc122bb6f2bb0917c"},
{file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:e4fe605b917c70283db7dfe5ada75e04561479075761a0b3866c081d035b01c1"},
{file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:1e9a65b5736232e7a7f91ff3d02277f11d339bf34099a56cdab6a8b3410a02b2"},
{file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:58d4b711689366d4a03ac7957ab8c28890415e267f9b6589969e74b6e42225ec"},
{file = "Brotli-1.1.0-cp310-cp310-win32.whl", hash = "sha256:be36e3d172dc816333f33520154d708a2657ea63762ec16b62ece02ab5e4daf2"},
{file = "Brotli-1.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:0c6244521dda65ea562d5a69b9a26120769b7a9fb3db2fe9545935ed6735b128"},
{file = "Brotli-1.1.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:a3daabb76a78f829cafc365531c972016e4aa8d5b4bf60660ad8ecee19df7ccc"},
@@ -537,14 +560,8 @@ files = [
{file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:19c116e796420b0cee3da1ccec3b764ed2952ccfcc298b55a10e5610ad7885f9"},
{file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:510b5b1bfbe20e1a7b3baf5fed9e9451873559a976c1a78eebaa3b86c57b4265"},
{file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:a1fd8a29719ccce974d523580987b7f8229aeace506952fa9ce1d53a033873c8"},
{file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c247dd99d39e0338a604f8c2b3bc7061d5c2e9e2ac7ba9cc1be5a69cb6cd832f"},
{file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1b2c248cd517c222d89e74669a4adfa5577e06ab68771a529060cf5a156e9757"},
{file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:2a24c50840d89ded6c9a8fdc7b6ed3692ed4e86f1c4a4a938e1e92def92933e0"},
{file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f31859074d57b4639318523d6ffdca586ace54271a73ad23ad021acd807eb14b"},
{file = "Brotli-1.1.0-cp311-cp311-win32.whl", hash = "sha256:39da8adedf6942d76dc3e46653e52df937a3c4d6d18fdc94a7c29d263b1f5b50"},
{file = "Brotli-1.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:aac0411d20e345dc0920bdec5548e438e999ff68d77564d5e9463a7ca9d3e7b1"},
{file = "Brotli-1.1.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:32d95b80260d79926f5fab3c41701dbb818fde1c9da590e77e571eefd14abe28"},
{file = "Brotli-1.1.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b760c65308ff1e462f65d69c12e4ae085cff3b332d894637f6273a12a482d09f"},
{file = "Brotli-1.1.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:316cc9b17edf613ac76b1f1f305d2a748f1b976b033b049a6ecdfd5612c70409"},
{file = "Brotli-1.1.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:caf9ee9a5775f3111642d33b86237b05808dafcd6268faa492250e9b78046eb2"},
{file = "Brotli-1.1.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:70051525001750221daa10907c77830bc889cb6d865cc0b813d9db7fefc21451"},
@@ -555,24 +572,8 @@ files = [
{file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:4093c631e96fdd49e0377a9c167bfd75b6d0bad2ace734c6eb20b348bc3ea180"},
{file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:7e4c4629ddad63006efa0ef968c8e4751c5868ff0b1c5c40f76524e894c50248"},
{file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:861bf317735688269936f755fa136a99d1ed526883859f86e41a5d43c61d8966"},
{file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:87a3044c3a35055527ac75e419dfa9f4f3667a1e887ee80360589eb8c90aabb9"},
{file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:c5529b34c1c9d937168297f2c1fde7ebe9ebdd5e121297ff9c043bdb2ae3d6fb"},
{file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:ca63e1890ede90b2e4454f9a65135a4d387a4585ff8282bb72964fab893f2111"},
{file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:e79e6520141d792237c70bcd7a3b122d00f2613769ae0cb61c52e89fd3443839"},
{file = "Brotli-1.1.0-cp312-cp312-win32.whl", hash = "sha256:5f4d5ea15c9382135076d2fb28dde923352fe02951e66935a9efaac8f10e81b0"},
{file = "Brotli-1.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:906bc3a79de8c4ae5b86d3d75a8b77e44404b0f4261714306e3ad248d8ab0951"},
{file = "Brotli-1.1.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:8bf32b98b75c13ec7cf774164172683d6e7891088f6316e54425fde1efc276d5"},
{file = "Brotli-1.1.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:7bc37c4d6b87fb1017ea28c9508b36bbcb0c3d18b4260fcdf08b200c74a6aee8"},
{file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3c0ef38c7a7014ffac184db9e04debe495d317cc9c6fb10071f7fefd93100a4f"},
{file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:91d7cc2a76b5567591d12c01f019dd7afce6ba8cba6571187e21e2fc418ae648"},
{file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a93dde851926f4f2678e704fadeb39e16c35d8baebd5252c9fd94ce8ce68c4a0"},
{file = "Brotli-1.1.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0db75f47be8b8abc8d9e31bc7aad0547ca26f24a54e6fd10231d623f183d089"},
{file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:6967ced6730aed543b8673008b5a391c3b1076d834ca438bbd70635c73775368"},
{file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:7eedaa5d036d9336c95915035fb57422054014ebdeb6f3b42eac809928e40d0c"},
{file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:d487f5432bf35b60ed625d7e1b448e2dc855422e87469e3f450aa5552b0eb284"},
{file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:832436e59afb93e1836081a20f324cb185836c617659b07b129141a8426973c7"},
{file = "Brotli-1.1.0-cp313-cp313-win32.whl", hash = "sha256:43395e90523f9c23a3d5bdf004733246fba087f2948f87ab28015f12359ca6a0"},
{file = "Brotli-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:9011560a466d2eb3f5a6e4929cf4a09be405c64154e12df0dd72713f6500e32b"},
{file = "Brotli-1.1.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:a090ca607cbb6a34b0391776f0cb48062081f5f60ddcce5d11838e67a01928d1"},
{file = "Brotli-1.1.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2de9d02f5bda03d27ede52e8cfe7b865b066fa49258cbab568720aa5be80a47d"},
{file = "Brotli-1.1.0-cp36-cp36m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2333e30a5e00fe0fe55903c8832e08ee9c3b1382aacf4db26664a16528d51b4b"},
@@ -582,10 +583,6 @@ files = [
{file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:fd5f17ff8f14003595ab414e45fce13d073e0762394f957182e69035c9f3d7c2"},
{file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_ppc64le.whl", hash = "sha256:069a121ac97412d1fe506da790b3e69f52254b9df4eb665cd42460c837193354"},
{file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:e93dfc1a1165e385cc8239fab7c036fb2cd8093728cbd85097b284d7b99249a2"},
{file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_aarch64.whl", hash = "sha256:aea440a510e14e818e67bfc4027880e2fb500c2ccb20ab21c7a7c8b5b4703d75"},
{file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_i686.whl", hash = "sha256:6974f52a02321b36847cd19d1b8e381bf39939c21efd6ee2fc13a28b0d99348c"},
{file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_ppc64le.whl", hash = "sha256:a7e53012d2853a07a4a79c00643832161a910674a893d296c9f1259859a289d2"},
{file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_x86_64.whl", hash = "sha256:d7702622a8b40c49bffb46e1e3ba2e81268d5c04a34f460978c6b5517a34dd52"},
{file = "Brotli-1.1.0-cp36-cp36m-win32.whl", hash = "sha256:a599669fd7c47233438a56936988a2478685e74854088ef5293802123b5b2460"},
{file = "Brotli-1.1.0-cp36-cp36m-win_amd64.whl", hash = "sha256:d143fd47fad1db3d7c27a1b1d66162e855b5d50a89666af46e1679c496e8e579"},
{file = "Brotli-1.1.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:11d00ed0a83fa22d29bc6b64ef636c4552ebafcef57154b4ddd132f5638fbd1c"},
@@ -597,10 +594,6 @@ files = [
{file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:919e32f147ae93a09fe064d77d5ebf4e35502a8df75c29fb05788528e330fe74"},
{file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_ppc64le.whl", hash = "sha256:23032ae55523cc7bccb4f6a0bf368cd25ad9bcdcc1990b64a647e7bbcce9cb5b"},
{file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:224e57f6eac61cc449f498cc5f0e1725ba2071a3d4f48d5d9dffba42db196438"},
{file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_aarch64.whl", hash = "sha256:cb1dac1770878ade83f2ccdf7d25e494f05c9165f5246b46a621cc849341dc01"},
{file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_i686.whl", hash = "sha256:3ee8a80d67a4334482d9712b8e83ca6b1d9bc7e351931252ebef5d8f7335a547"},
{file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_ppc64le.whl", hash = "sha256:5e55da2c8724191e5b557f8e18943b1b4839b8efc3ef60d65985bcf6f587dd38"},
{file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:d342778ef319e1026af243ed0a07c97acf3bad33b9f29e7ae6a1f68fd083e90c"},
{file = "Brotli-1.1.0-cp37-cp37m-win32.whl", hash = "sha256:587ca6d3cef6e4e868102672d3bd9dc9698c309ba56d41c2b9c85bbb903cdb95"},
{file = "Brotli-1.1.0-cp37-cp37m-win_amd64.whl", hash = "sha256:2954c1c23f81c2eaf0b0717d9380bd348578a94161a65b3a2afc62c86467dd68"},
{file = "Brotli-1.1.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:efa8b278894b14d6da122a72fefcebc28445f2d3f880ac59d46c90f4c13be9a3"},
@@ -613,10 +606,6 @@ files = [
{file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:1ab4fbee0b2d9098c74f3057b2bc055a8bd92ccf02f65944a241b4349229185a"},
{file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:141bd4d93984070e097521ed07e2575b46f817d08f9fa42b16b9b5f27b5ac088"},
{file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:fce1473f3ccc4187f75b4690cfc922628aed4d3dd013d047f95a9b3919a86596"},
{file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:d2b35ca2c7f81d173d2fadc2f4f31e88cc5f7a39ae5b6db5513cf3383b0e0ec7"},
{file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:af6fa6817889314555aede9a919612b23739395ce767fe7fcbea9a80bf140fe5"},
{file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:2feb1d960f760a575dbc5ab3b1c00504b24caaf6986e2dc2b01c09c87866a943"},
{file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:4410f84b33374409552ac9b6903507cdb31cd30d2501fc5ca13d18f73548444a"},
{file = "Brotli-1.1.0-cp38-cp38-win32.whl", hash = "sha256:db85ecf4e609a48f4b29055f1e144231b90edc90af7481aa731ba2d059226b1b"},
{file = "Brotli-1.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:3d7954194c36e304e1523f55d7042c59dc53ec20dd4e9ea9d151f1b62b4415c0"},
{file = "Brotli-1.1.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5fb2ce4b8045c78ebbc7b8f3c15062e435d47e7393cc57c25115cfd49883747a"},
@@ -629,10 +618,6 @@ files = [
{file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:949f3b7c29912693cee0afcf09acd6ebc04c57af949d9bf77d6101ebb61e388c"},
{file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:89f4988c7203739d48c6f806f1e87a1d96e0806d44f0fba61dba81392c9e474d"},
{file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:de6551e370ef19f8de1807d0a9aa2cdfdce2e85ce88b122fe9f6b2b076837e59"},
{file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:0737ddb3068957cf1b054899b0883830bb1fec522ec76b1098f9b6e0f02d9419"},
{file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:4f3607b129417e111e30637af1b56f24f7a49e64763253bbc275c75fa887d4b2"},
{file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:6c6e0c425f22c1c719c42670d561ad682f7bfeeef918edea971a79ac5252437f"},
{file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:494994f807ba0b92092a163a0a283961369a65f6cbe01e8891132b7a320e61eb"},
{file = "Brotli-1.1.0-cp39-cp39-win32.whl", hash = "sha256:f0d8a7a6b5983c2496e364b969f0e526647a06b075d034f3297dc66f3b360c64"},
{file = "Brotli-1.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:cdad5b9014d83ca68c25d2e9444e28e967ef16e80f6b436918c700c117a85467"},
{file = "Brotli-1.1.0.tar.gz", hash = "sha256:81de08ac11bcb85841e440c13611c00b67d3bf82698314928d0b676362546724"},
@@ -1056,9 +1041,9 @@ isort = ">=4.3.21,<6.0"
jinja2 = ">=2.10.1,<4.0"
packaging = "*"
pydantic = [
{version = ">=1.10.0,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.4.0 || >2.4.0,<3.0", extras = ["email"], markers = "python_version >= \"3.12\" and python_version < \"4.0\""},
{version = ">=1.10.0,<2.4.0 || >2.4.0,<3.0", extras = ["email"], markers = "python_version >= \"3.11\" and python_version < \"3.12\""},
{version = ">=1.9.0,<2.4.0 || >2.4.0,<3.0", extras = ["email"], markers = "python_version >= \"3.10\" and python_version < \"3.11\""},
{version = ">=1.10.0,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.4.0 || >2.4.0,<3.0", extras = ["email"], markers = "python_version >= \"3.12\" and python_version < \"4.0\""},
]
pyyaml = ">=6.0.1"
toml = {version = ">=0.10.0,<1.0.0", markers = "python_version < \"3.11\""}
@@ -3082,8 +3067,8 @@ psutil = ">=5.9.1"
pywin32 = {version = "*", markers = "sys_platform == \"win32\""}
pyzmq = ">=25.0.0"
requests = [
{version = ">=2.26.0", markers = "python_version <= \"3.11\""},
{version = ">=2.32.2", markers = "python_version > \"3.11\""},
{version = ">=2.26.0", markers = "python_version <= \"3.11\""},
]
setuptools = ">=70.0.0"
tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""}
@@ -3936,9 +3921,9 @@ files = [
[package.dependencies]
numpy = [
{version = ">=1.26.0", markers = "python_version >= \"3.12\""},
{version = ">=1.23.2", markers = "python_version == \"3.11\""},
{version = ">=1.22.4", markers = "python_version < \"3.11\""},
{version = ">=1.26.0", markers = "python_version >= \"3.12\""},
]
python-dateutil = ">=2.8.2"
pytz = ">=2020.1"
@@ -4424,7 +4409,6 @@ files = [
{file = "psycopg2-2.9.10-cp311-cp311-win_amd64.whl", hash = "sha256:0435034157049f6846e95103bd8f5a668788dd913a7c30162ca9503fdf542cb4"},
{file = "psycopg2-2.9.10-cp312-cp312-win32.whl", hash = "sha256:65a63d7ab0e067e2cdb3cf266de39663203d38d6a8ed97f5ca0cb315c73fe067"},
{file = "psycopg2-2.9.10-cp312-cp312-win_amd64.whl", hash = "sha256:4a579d6243da40a7b3182e0430493dbd55950c493d8c68f4eec0b302f6bbf20e"},
{file = "psycopg2-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:91fd603a2155da8d0cfcdbf8ab24a2d54bca72795b90d2a3ed2b6da8d979dee2"},
{file = "psycopg2-2.9.10-cp39-cp39-win32.whl", hash = "sha256:9d5b3b94b79a844a986d029eee38998232451119ad653aea42bb9220a8c5066b"},
{file = "psycopg2-2.9.10-cp39-cp39-win_amd64.whl", hash = "sha256:88138c8dedcbfa96408023ea2b0c369eda40fe5d75002c0964c78f46f11fa442"},
{file = "psycopg2-2.9.10.tar.gz", hash = "sha256:12ec0b40b0273f95296233e8750441339298e6a572f7039da5b260e3c8b60e11"},
@@ -4484,7 +4468,6 @@ files = [
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:bb89f0a835bcfc1d42ccd5f41f04870c1b936d8507c6df12b7737febc40f0909"},
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f0c2d907a1e102526dd2986df638343388b94c33860ff3bbe1384130828714b1"},
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f8157bed2f51db683f31306aa497311b560f2265998122abe1dce6428bd86567"},
{file = "psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:27422aa5f11fbcd9b18da48373eb67081243662f9b46e6fd07c3eb46e4535142"},
{file = "psycopg2_binary-2.9.10-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:eb09aa7f9cecb45027683bb55aebaaf45a0df8bf6de68801a6afdc7947bb09d4"},
{file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b73d6d7f0ccdad7bc43e6d34273f70d587ef62f824d7261c4ae9b8b1b6af90e8"},
{file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ce5ab4bf46a211a8e924d307c1b1fcda82368586a19d0a24f8ae166f5c784864"},
@@ -5237,8 +5220,8 @@ grpcio = ">=1.41.0"
grpcio-tools = ">=1.41.0"
httpx = {version = ">=0.20.0", extras = ["http2"]}
numpy = [
{version = ">=1.21", markers = "python_version >= \"3.10\" and python_version < \"3.12\""},
{version = ">=1.26", markers = "python_version == \"3.12\""},
{version = ">=1.21", markers = "python_version >= \"3.10\" and python_version < \"3.12\""},
]
portalocker = ">=2.7.0,<3.0.0"
pydantic = ">=1.10.8"
@@ -6219,6 +6202,23 @@ files = [
{file = "tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"},
]
[[package]]
name = "tzlocal"
version = "5.3.1"
description = "tzinfo object for the local timezone"
optional = false
python-versions = ">=3.9"
files = [
{file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"},
{file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"},
]
[package.dependencies]
tzdata = {version = "*", markers = "platform_system == \"Windows\""}
[package.extras]
devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"]
[[package]]
name = "urllib3"
version = "2.3.0"
@@ -6819,4 +6819,4 @@ tests = ["wikipedia"]
[metadata]
lock-version = "2.0"
python-versions = "<3.14,>=3.10"
content-hash = "b104f1fe1ae3c5b2e1261105f1e4170d28fee235551c230b3603bb5494054bf1"
content-hash = "c7532fe22e86ca8602c0b27be85020e0b139ec521cd5b0dc94b180113ede41c4"

View File

@@ -88,6 +88,7 @@ boto3 = {version = "^1.36.24", optional = true}
datamodel-code-generator = {extras = ["http"], version = "^0.25.0"}
mcp = "^1.3.0"
firecrawl-py = "^1.15.0"
apscheduler = "^3.11.0"
[tool.poetry.extras]

View File

@@ -0,0 +1,331 @@
import os
import threading
import time
from datetime import datetime, timezone
from unittest.mock import AsyncMock
import pytest
from anthropic.types import BetaErrorResponse, BetaRateLimitError
from anthropic.types.beta import BetaMessage
from anthropic.types.beta.messages import (
BetaMessageBatch,
BetaMessageBatchErroredResult,
BetaMessageBatchIndividualResponse,
BetaMessageBatchRequestCounts,
BetaMessageBatchSucceededResult,
)
from dotenv import load_dotenv
from letta_client import Letta
from letta.config import LettaConfig
from letta.helpers import ToolRulesSolver
from letta.jobs.llm_batch_job_polling import poll_running_llm_batches
from letta.orm import Base
from letta.schemas.agent import AgentStepState
from letta.schemas.enums import JobStatus, ProviderType
from letta.schemas.llm_config import LLMConfig
from letta.schemas.tool_rule import InitToolRule
from letta.server.db import db_context
from letta.server.server import SyncServer
# --- Server and Database Management --- #
@pytest.fixture(autouse=True)
def _clear_tables():
with db_context() as session:
for table in reversed(Base.metadata.sorted_tables): # Reverse to avoid FK issues
if table.name in {"llm_batch_job", "llm_batch_items"}:
session.execute(table.delete()) # Truncate table
session.commit()
def _run_server():
"""Starts the Letta server in a background thread."""
load_dotenv()
from letta.server.rest_api.app import start_server
start_server(debug=True)
@pytest.fixture(scope="session")
def server_url():
"""Ensures a server is running and returns its base URL."""
url = os.getenv("LETTA_SERVER_URL", "http://localhost:8283")
if not os.getenv("LETTA_SERVER_URL"):
thread = threading.Thread(target=_run_server, daemon=True)
thread.start()
time.sleep(5) # Allow server startup time
return url
@pytest.fixture(scope="module")
def server():
config = LettaConfig.load()
print("CONFIG PATH", config.config_path)
config.save()
return SyncServer()
@pytest.fixture(scope="session")
def client(server_url):
"""Creates a REST client for testing."""
return Letta(base_url=server_url)
# --- Dummy Response Factories --- #
def create_batch_response(batch_id: str, processing_status: str = "in_progress") -> BetaMessageBatch:
"""Create a dummy BetaMessageBatch with the specified ID and status."""
now = datetime(2024, 8, 20, 18, 37, 24, 100435, tzinfo=timezone.utc)
return BetaMessageBatch(
id=batch_id,
archived_at=now,
cancel_initiated_at=now,
created_at=now,
ended_at=now,
expires_at=now,
processing_status=processing_status,
request_counts=BetaMessageBatchRequestCounts(
canceled=10,
errored=30,
expired=10,
processing=100,
succeeded=50,
),
results_url=None,
type="message_batch",
)
def create_successful_response(custom_id: str) -> BetaMessageBatchIndividualResponse:
"""Create a dummy successful batch response."""
return BetaMessageBatchIndividualResponse(
custom_id=custom_id,
result=BetaMessageBatchSucceededResult(
type="succeeded",
message=BetaMessage(
id="msg_abc123",
role="assistant",
type="message",
model="claude-3-5-sonnet-20240620",
content=[{"type": "text", "text": "hi!"}],
usage={"input_tokens": 5, "output_tokens": 7},
stop_reason="end_turn",
),
),
)
def create_failed_response(custom_id: str) -> BetaMessageBatchIndividualResponse:
"""Create a dummy failed batch response with a rate limit error."""
return BetaMessageBatchIndividualResponse(
custom_id=custom_id,
result=BetaMessageBatchErroredResult(
type="errored",
error=BetaErrorResponse(type="error", error=BetaRateLimitError(type="rate_limit_error", message="Rate limit hit.")),
),
)
# --- Test Setup Helpers --- #
def create_test_agent(client, name, model="anthropic/claude-3-5-sonnet-20241022"):
"""Create a test agent with standardized configuration."""
return client.agents.create(
name=name,
include_base_tools=True,
model=model,
tags=["test_agents"],
embedding="letta/letta-free",
)
def create_test_batch_job(server, batch_response, default_user):
"""Create a test batch job with the given batch response."""
return server.batch_manager.create_batch_request(
llm_provider=ProviderType.anthropic,
create_batch_response=batch_response,
actor=default_user,
status=JobStatus.running,
)
def create_test_batch_item(server, batch_id, agent_id, default_user):
"""Create a test batch item for the given batch and agent."""
dummy_llm_config = LLMConfig(
model="claude-3-7-sonnet-latest",
model_endpoint_type="anthropic",
model_endpoint="https://api.anthropic.com/v1",
context_window=32000,
handle=f"anthropic/claude-3-7-sonnet-latest",
put_inner_thoughts_in_kwargs=True,
max_tokens=4096,
)
common_step_state = AgentStepState(
step_number=1, tool_rules_solver=ToolRulesSolver(tool_rules=[InitToolRule(tool_name="send_message")])
)
return server.batch_manager.create_batch_item(
batch_id=batch_id,
agent_id=agent_id,
llm_config=dummy_llm_config,
step_state=common_step_state,
actor=default_user,
)
def mock_anthropic_client(server, batch_a_resp, batch_b_resp, agent_b_id, agent_c_id):
"""Set up mocks for the Anthropic client's retrieve and results methods."""
# Mock the retrieve method
async def dummy_retrieve(batch_resp_id: str) -> BetaMessageBatch:
if batch_resp_id == batch_a_resp.id:
return batch_a_resp
elif batch_resp_id == batch_b_resp.id:
return batch_b_resp
else:
raise ValueError(f"Unknown batch response id: {batch_resp_id}")
server.anthropic_async_client.beta.messages.batches.retrieve = AsyncMock(side_effect=dummy_retrieve)
# Mock the results method
def dummy_results(batch_resp_id: str):
if batch_resp_id == batch_b_resp.id:
async def generator():
yield create_successful_response(agent_b_id)
yield create_failed_response(agent_c_id)
return generator()
else:
raise RuntimeError("This test should never request the results for batch_a.")
server.anthropic_async_client.beta.messages.batches.results = dummy_results
# -----------------------------
# End-to-End Test
# -----------------------------
@pytest.mark.asyncio
async def test_polling_mixed_batch_jobs(client, default_user, server):
"""
End-to-end test for polling batch jobs with mixed statuses and idempotency.
Test scenario:
- Create two batch jobs:
- Job A: Single agent that remains "in_progress"
- Job B: Two agents that complete (one succeeds, one fails)
- Poll jobs and verify:
- Job A remains in "running" state
- Job B moves to "completed" state
- Job B's items reflect appropriate individual success/failure statuses
- Test idempotency:
- Run additional polls and verify:
- Completed job B remains unchanged (no status changes or re-polling)
- In-progress job A continues to be polled
- All batch items maintain their final states
"""
# --- Step 1: Prepare test data ---
# Create batch responses with different statuses
batch_a_resp = create_batch_response("msgbatch_A", processing_status="in_progress")
batch_b_resp = create_batch_response("msgbatch_B", processing_status="ended")
# Create test agents
agent_a = create_test_agent(client, "agent_a")
agent_b = create_test_agent(client, "agent_b")
agent_c = create_test_agent(client, "agent_c")
# --- Step 2: Create batch jobs ---
job_a = create_test_batch_job(server, batch_a_resp, default_user)
job_b = create_test_batch_job(server, batch_b_resp, default_user)
# --- Step 3: Create batch items ---
item_a = create_test_batch_item(server, job_a.id, agent_a.id, default_user)
item_b = create_test_batch_item(server, job_b.id, agent_b.id, default_user)
item_c = create_test_batch_item(server, job_b.id, agent_c.id, default_user)
# --- Step 4: Mock the Anthropic client ---
mock_anthropic_client(server, batch_a_resp, batch_b_resp, agent_b.id, agent_c.id)
# --- Step 5: Run the polling job twice (simulating periodic polling) ---
await poll_running_llm_batches(server)
await poll_running_llm_batches(server)
# --- Step 6: Verify batch job status updates ---
updated_job_a = server.batch_manager.get_batch_job_by_id(batch_id=job_a.id, actor=default_user)
updated_job_b = server.batch_manager.get_batch_job_by_id(batch_id=job_b.id, actor=default_user)
# Job A should remain running since its processing_status is "in_progress"
assert updated_job_a.status == JobStatus.running
# Job B should be updated to completed
assert updated_job_b.status == JobStatus.completed
# Both jobs should have been polled
assert updated_job_a.last_polled_at is not None
assert updated_job_b.last_polled_at is not None
assert updated_job_b.latest_polling_response is not None
# --- Step 7: Verify batch item status updates ---
# Item A should remain unchanged
updated_item_a = server.batch_manager.get_batch_item_by_id(item_a.id, actor=default_user)
assert updated_item_a.request_status == JobStatus.created
assert updated_item_a.batch_request_result is None
# Item B should be marked as completed with a successful result
updated_item_b = server.batch_manager.get_batch_item_by_id(item_b.id, actor=default_user)
assert updated_item_b.request_status == JobStatus.completed
assert updated_item_b.batch_request_result is not None
# Item C should be marked as failed with an error result
updated_item_c = server.batch_manager.get_batch_item_by_id(item_c.id, actor=default_user)
assert updated_item_c.request_status == JobStatus.failed
assert updated_item_c.batch_request_result is not None
# --- Step 8: Test idempotency by running polls again ---
# Save timestamps and response objects to compare later
job_a_polled_at = updated_job_a.last_polled_at
job_b_polled_at = updated_job_b.last_polled_at
job_b_response = updated_job_b.latest_polling_response
# Save detailed item states
item_a_status = updated_item_a.request_status
item_b_status = updated_item_b.request_status
item_c_status = updated_item_c.request_status
item_b_result = updated_item_b.batch_request_result
item_c_result = updated_item_c.batch_request_result
# Run the polling job again multiple times
await poll_running_llm_batches(server)
await poll_running_llm_batches(server)
await poll_running_llm_batches(server)
# --- Step 9: Verify that nothing changed for completed jobs ---
# Refresh all objects
final_job_a = server.batch_manager.get_batch_job_by_id(batch_id=job_a.id, actor=default_user)
final_job_b = server.batch_manager.get_batch_job_by_id(batch_id=job_b.id, actor=default_user)
final_item_a = server.batch_manager.get_batch_item_by_id(item_a.id, actor=default_user)
final_item_b = server.batch_manager.get_batch_item_by_id(item_b.id, actor=default_user)
final_item_c = server.batch_manager.get_batch_item_by_id(item_c.id, actor=default_user)
# Job A should still be polling (last_polled_at should update)
assert final_job_a.status == JobStatus.running
assert final_job_a.last_polled_at > job_a_polled_at
# Job B should remain completed with no status changes
assert final_job_b.status == JobStatus.completed
# The completed job should not be polled again
assert final_job_b.last_polled_at == job_b_polled_at
assert final_job_b.latest_polling_response == job_b_response
# All items should maintain their final states
assert final_item_a.request_status == item_a_status
assert final_item_b.request_status == item_b_status
assert final_item_c.request_status == item_c_status
assert final_item_b.batch_request_result == item_b_result
assert final_item_c.batch_request_result == item_c_result

View File

@@ -39,7 +39,7 @@ from letta.schemas.agent import AgentStepState, CreateAgent, UpdateAgent
from letta.schemas.block import Block as PydanticBlock
from letta.schemas.block import BlockUpdate, CreateBlock
from letta.schemas.embedding_config import EmbeddingConfig
from letta.schemas.enums import AgentStepStatus, JobStatus, MessageRole
from letta.schemas.enums import AgentStepStatus, JobStatus, MessageRole, ProviderType
from letta.schemas.environment_variables import SandboxEnvironmentVariableCreate, SandboxEnvironmentVariableUpdate
from letta.schemas.file import FileMetadata as PydanticFileMetadata
from letta.schemas.identity import IdentityCreate, IdentityProperty, IdentityPropertyType, IdentityType, IdentityUpdate
@@ -4708,20 +4708,20 @@ def test_list_tags(server: SyncServer, default_user, default_organization):
def test_create_and_get_batch_request(server, default_user, dummy_beta_message_batch):
batch = server.batch_manager.create_batch_request(
llm_provider="anthropic",
llm_provider=ProviderType.anthropic,
status=JobStatus.created,
create_batch_response=dummy_beta_message_batch,
actor=default_user,
)
assert batch.id.startswith("batch_req-")
assert batch.create_batch_response == dummy_beta_message_batch
fetched = server.batch_manager.get_batch_request_by_id(batch.id, actor=default_user)
fetched = server.batch_manager.get_batch_job_by_id(batch.id, actor=default_user)
assert fetched.id == batch.id
def test_update_batch_status(server, default_user, dummy_beta_message_batch):
batch = server.batch_manager.create_batch_request(
llm_provider="anthropic",
llm_provider=ProviderType.anthropic,
status=JobStatus.created,
create_batch_response=dummy_beta_message_batch,
actor=default_user,
@@ -4735,7 +4735,7 @@ def test_update_batch_status(server, default_user, dummy_beta_message_batch):
actor=default_user,
)
updated = server.batch_manager.get_batch_request_by_id(batch.id, actor=default_user)
updated = server.batch_manager.get_batch_job_by_id(batch.id, actor=default_user)
assert updated.status == JobStatus.completed
assert updated.latest_polling_response == dummy_beta_message_batch
assert updated.last_polled_at >= before
@@ -4743,7 +4743,7 @@ def test_update_batch_status(server, default_user, dummy_beta_message_batch):
def test_create_and_get_batch_item(server, default_user, sarah_agent, dummy_beta_message_batch, dummy_llm_config, dummy_step_state):
batch = server.batch_manager.create_batch_request(
llm_provider="anthropic",
llm_provider=ProviderType.anthropic,
status=JobStatus.created,
create_batch_response=dummy_beta_message_batch,
actor=default_user,
@@ -4769,7 +4769,7 @@ def test_update_batch_item(
server, default_user, sarah_agent, dummy_beta_message_batch, dummy_llm_config, dummy_step_state, dummy_successful_response
):
batch = server.batch_manager.create_batch_request(
llm_provider="anthropic",
llm_provider=ProviderType.anthropic,
status=JobStatus.created,
create_batch_response=dummy_beta_message_batch,
actor=default_user,
@@ -4801,7 +4801,7 @@ def test_update_batch_item(
def test_delete_batch_item(server, default_user, sarah_agent, dummy_beta_message_batch, dummy_llm_config, dummy_step_state):
batch = server.batch_manager.create_batch_request(
llm_provider="anthropic",
llm_provider=ProviderType.anthropic,
status=JobStatus.created,
create_batch_response=dummy_beta_message_batch,
actor=default_user,