diff --git a/letta/jobs/__init__.py b/letta/jobs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/letta/jobs/helpers.py b/letta/jobs/helpers.py new file mode 100644 index 00000000..20f4af8f --- /dev/null +++ b/letta/jobs/helpers.py @@ -0,0 +1,25 @@ +from anthropic.types.beta.messages import ( + BetaMessageBatchCanceledResult, + BetaMessageBatchIndividualResponse, + BetaMessageBatchSucceededResult, +) + +from letta.schemas.enums import JobStatus + + +def map_anthropic_batch_job_status_to_job_status(anthropic_status: str) -> JobStatus: + mapping = { + "in_progress": JobStatus.running, + "canceling": JobStatus.cancelled, + "ended": JobStatus.completed, + } + return mapping.get(anthropic_status, JobStatus.pending) # fallback just in case + + +def map_anthropic_individual_batch_item_status_to_job_status(individual_item: BetaMessageBatchIndividualResponse) -> JobStatus: + if isinstance(individual_item.result, BetaMessageBatchSucceededResult): + return JobStatus.completed + elif isinstance(individual_item.result, BetaMessageBatchCanceledResult): + return JobStatus.cancelled + else: + return JobStatus.failed diff --git a/letta/jobs/llm_batch_job_polling.py b/letta/jobs/llm_batch_job_polling.py new file mode 100644 index 00000000..79788aa5 --- /dev/null +++ b/letta/jobs/llm_batch_job_polling.py @@ -0,0 +1,204 @@ +import asyncio +import datetime +from typing import List + +from letta.jobs.helpers import map_anthropic_batch_job_status_to_job_status, map_anthropic_individual_batch_item_status_to_job_status +from letta.jobs.types import BatchId, BatchPollingResult, ItemUpdateInfo +from letta.log import get_logger +from letta.schemas.enums import JobStatus, ProviderType +from letta.schemas.llm_batch_job import LLMBatchJob +from letta.server.server import SyncServer + +logger = get_logger(__name__) + + +class BatchPollingMetrics: + """Class to track metrics for batch polling operations.""" + + def __init__(self): + self.start_time = datetime.datetime.now() + self.total_batches = 0 + self.anthropic_batches = 0 + self.running_count = 0 + self.completed_count = 0 + self.updated_items_count = 0 + + def log_summary(self): + """Log a summary of the metrics collected during polling.""" + elapsed = (datetime.datetime.now() - self.start_time).total_seconds() + logger.info(f"[Poll BatchJob] Finished poll_running_llm_batches job in {elapsed:.2f}s") + logger.info(f"[Poll BatchJob] Found {self.total_batches} running batches total.") + logger.info(f"[Poll BatchJob] Found {self.anthropic_batches} Anthropic batch(es) to poll.") + logger.info(f"[Poll BatchJob] Final results: {self.completed_count} completed, {self.running_count} still running.") + logger.info(f"[Poll BatchJob] Updated {self.updated_items_count} items for newly completed batch(es).") + + +async def fetch_batch_status(server: SyncServer, batch_job: LLMBatchJob) -> BatchPollingResult: + """ + Fetch the current status of a single batch job from the provider. + + Args: + server: The SyncServer instance + batch_job: The batch job to check status for + + Returns: + A tuple containing (batch_id, new_status, polling_response) + """ + batch_id_str = batch_job.create_batch_response.id + try: + response = await server.anthropic_async_client.beta.messages.batches.retrieve(batch_id_str) + new_status = map_anthropic_batch_job_status_to_job_status(response.processing_status) + logger.debug(f"[Poll BatchJob] Batch {batch_job.id}: provider={response.processing_status} → internal={new_status}") + return (batch_job.id, new_status, response) + except Exception as e: + logger.warning(f"[Poll BatchJob] Batch {batch_job.id}: failed to retrieve {batch_id_str}: {e}") + # We treat a retrieval error as still running to try again next cycle + return (batch_job.id, JobStatus.running, None) + + +async def fetch_batch_items(server: SyncServer, batch_id: BatchId, batch_resp_id: str) -> List[ItemUpdateInfo]: + """ + Fetch individual item results for a completed batch. + + Args: + server: The SyncServer instance + batch_id: The internal batch ID + batch_resp_id: The provider's batch response ID + + Returns: + A list of item update information tuples + """ + updates = [] + try: + async for item_result in server.anthropic_async_client.beta.messages.batches.results(batch_resp_id): + # Here, custom_id should be the agent_id + item_status = map_anthropic_individual_batch_item_status_to_job_status(item_result) + updates.append((batch_id, item_result.custom_id, item_status, item_result)) + logger.info(f"[Poll BatchJob] Fetched {len(updates)} item updates for batch {batch_id}.") + except Exception as e: + logger.error(f"[Poll BatchJob] Error fetching item updates for batch {batch_id}: {e}") + + return updates + + +async def poll_batch_updates(server: SyncServer, batch_jobs: List[LLMBatchJob], metrics: BatchPollingMetrics) -> List[BatchPollingResult]: + """ + Poll for updates to multiple batch jobs concurrently. + + Args: + server: The SyncServer instance + batch_jobs: List of batch jobs to poll + metrics: Metrics collection object + + Returns: + List of batch polling results + """ + if not batch_jobs: + logger.info("[Poll BatchJob] No Anthropic batches to update; job complete.") + return [] + + # Create polling tasks for all batch jobs + coros = [fetch_batch_status(server, b) for b in batch_jobs] + results: List[BatchPollingResult] = await asyncio.gather(*coros) + + # Update the server with batch status changes + server.batch_manager.bulk_update_batch_statuses(updates=results) + logger.info(f"[Poll BatchJob] Bulk-updated {len(results)} LLM batch(es) in the DB at job level.") + + return results + + +async def process_completed_batches( + server: SyncServer, batch_results: List[BatchPollingResult], metrics: BatchPollingMetrics +) -> List[ItemUpdateInfo]: + """ + Process batches that have completed and fetch their item results. + + Args: + server: The SyncServer instance + batch_results: Results from polling batch statuses + metrics: Metrics collection object + + Returns: + List of item updates to apply + """ + item_update_tasks = [] + + # Process each top-level polling result + for batch_id, new_status, maybe_batch_resp in batch_results: + if not maybe_batch_resp: + if new_status == JobStatus.running: + metrics.running_count += 1 + logger.warning(f"[Poll BatchJob] Batch {batch_id}: JobStatus was {new_status} and no batch response was found.") + continue + + if new_status == JobStatus.completed: + metrics.completed_count += 1 + batch_resp_id = maybe_batch_resp.id # The Anthropic-assigned batch ID + # Queue an async call to fetch item results for this batch + item_update_tasks.append(fetch_batch_items(server, batch_id, batch_resp_id)) + elif new_status == JobStatus.running: + metrics.running_count += 1 + + # Launch all item update tasks concurrently + concurrent_results = await asyncio.gather(*item_update_tasks, return_exceptions=True) + + # Flatten and filter the results + item_updates = [] + for result in concurrent_results: + if isinstance(result, Exception): + logger.error(f"[Poll BatchJob] A fetch_batch_items task failed with: {result}") + elif isinstance(result, list): + item_updates.extend(result) + + logger.info(f"[Poll BatchJob] Collected a total of {len(item_updates)} item update(s) from completed batches.") + + return item_updates + + +async def poll_running_llm_batches(server: "SyncServer") -> None: + """ + Cron job to poll all running LLM batch jobs and update their polling responses in bulk. + + Steps: + 1. Fetch currently running batch jobs + 2. Filter Anthropic only + 3. Retrieve updated top-level polling info concurrently + 4. Bulk update LLMBatchJob statuses + 5. For each completed batch, call .results(...) to get item-level results + 6. Bulk update all matching LLMBatchItem records by (batch_id, agent_id) + 7. Log telemetry about success/fail + """ + # Initialize metrics tracking + metrics = BatchPollingMetrics() + + logger.info("[Poll BatchJob] Starting poll_running_llm_batches job") + + try: + # 1. Retrieve running batch jobs + batches = server.batch_manager.list_running_batches() + metrics.total_batches = len(batches) + + # TODO: Expand to more providers + # 2. Filter for Anthropic jobs only + anthropic_batch_jobs = [b for b in batches if b.llm_provider == ProviderType.anthropic] + metrics.anthropic_batches = len(anthropic_batch_jobs) + + # 3-4. Poll for batch updates and bulk update statuses + batch_results = await poll_batch_updates(server, anthropic_batch_jobs, metrics) + + # 5. Process completed batches and fetch item results + item_updates = await process_completed_batches(server, batch_results, metrics) + + # 6. Bulk update all items for newly completed batch(es) + if item_updates: + metrics.updated_items_count = len(item_updates) + server.batch_manager.bulk_update_batch_items_by_agent(item_updates) + else: + logger.info("[Poll BatchJob] No item-level updates needed.") + + except Exception as e: + logger.exception("[Poll BatchJob] Unhandled error in poll_running_llm_batches", exc_info=e) + finally: + # 7. Log metrics summary + metrics.log_summary() diff --git a/letta/jobs/scheduler.py b/letta/jobs/scheduler.py new file mode 100644 index 00000000..efbdacea --- /dev/null +++ b/letta/jobs/scheduler.py @@ -0,0 +1,28 @@ +import datetime + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger + +from letta.jobs.llm_batch_job_polling import poll_running_llm_batches +from letta.server.server import SyncServer +from letta.settings import settings + +scheduler = AsyncIOScheduler() + + +def start_cron_jobs(server: SyncServer): + """Initialize cron jobs""" + scheduler.add_job( + poll_running_llm_batches, + args=[server], + trigger=IntervalTrigger(seconds=settings.poll_running_llm_batches_interval_seconds), + next_run_time=datetime.datetime.now(datetime.UTC), + id="poll_llm_batches", + name="Poll LLM API batch jobs and update status", + replace_existing=True, + ) + scheduler.start() + + +def shutdown_cron_scheduler(): + scheduler.shutdown() diff --git a/letta/jobs/types.py b/letta/jobs/types.py new file mode 100644 index 00000000..217ee85a --- /dev/null +++ b/letta/jobs/types.py @@ -0,0 +1,10 @@ +from typing import Optional, Tuple + +from anthropic.types.beta.messages import BetaMessageBatch, BetaMessageBatchIndividualResponse + +from letta.schemas.enums import JobStatus + +BatchId = str +AgentId = str +BatchPollingResult = Tuple[BatchId, JobStatus, Optional[BetaMessageBatch]] +ItemUpdateInfo = Tuple[BatchId, AgentId, JobStatus, BetaMessageBatchIndividualResponse] diff --git a/letta/schemas/enums.py b/letta/schemas/enums.py index c02e1438..2fd6446f 100644 --- a/letta/schemas/enums.py +++ b/letta/schemas/enums.py @@ -32,6 +32,7 @@ class JobStatus(str, Enum): completed = "completed" failed = "failed" pending = "pending" + cancelled = "cancelled" class AgentStepStatus(str, Enum): diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index ddf66a82..f085f499 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -16,6 +16,7 @@ from starlette.middleware.cors import CORSMiddleware from letta.__init__ import __version__ from letta.constants import ADMIN_PREFIX, API_PREFIX, OPENAI_API_PREFIX from letta.errors import BedrockPermissionError, LettaAgentNotFoundError, LettaUserNotFoundError +from letta.jobs.scheduler import shutdown_cron_scheduler, start_cron_jobs from letta.log import get_logger from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError from letta.schemas.letta_message import create_letta_message_union_schema @@ -144,6 +145,12 @@ def create_application() -> "FastAPI": executor = concurrent.futures.ThreadPoolExecutor(max_workers=settings.event_loop_threadpool_max_workers) loop.set_default_executor(executor) + @app.on_event("startup") + def on_startup(): + global server + + start_cron_jobs(server) + @app.on_event("shutdown") def shutdown_mcp_clients(): global server @@ -159,6 +166,10 @@ def create_application() -> "FastAPI": t.start() t.join() + @app.on_event("shutdown") + def shutdown_scheduler(): + shutdown_cron_scheduler() + @app.exception_handler(Exception) async def generic_error_handler(request: Request, exc: Exception): # Log the actual error for debugging diff --git a/letta/server/server.py b/letta/server/server.py index 3d9aedfe..712a4f93 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -8,6 +8,7 @@ from abc import abstractmethod from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from anthropic import AsyncAnthropic from composio.client import Composio from composio.client.collections import ActionModel, AppModel from fastapi import HTTPException @@ -352,6 +353,9 @@ class SyncServer(Server): self._llm_config_cache = {} self._embedding_config_cache = {} + # TODO: Replace this with the Anthropic client we have in house + self.anthropic_async_client = AsyncAnthropic() + def load_agent(self, agent_id: str, actor: User, interface: Union[AgentInterface, None] = None) -> Agent: """Updated method to load agents from persisted storage""" agent_state = self.agent_manager.get_agent_by_id(agent_id=agent_id, actor=actor) diff --git a/letta/services/llm_batch_manager.py b/letta/services/llm_batch_manager.py index b538e549..405fd36e 100644 --- a/letta/services/llm_batch_manager.py +++ b/letta/services/llm_batch_manager.py @@ -1,13 +1,15 @@ import datetime -from typing import Optional +from typing import List, Optional from anthropic.types.beta.messages import BetaMessageBatch, BetaMessageBatchIndividualResponse +from sqlalchemy import tuple_ +from letta.jobs.types import BatchPollingResult, ItemUpdateInfo from letta.log import get_logger from letta.orm.llm_batch_items import LLMBatchItem from letta.orm.llm_batch_job import LLMBatchJob from letta.schemas.agent import AgentStepState -from letta.schemas.enums import AgentStepStatus, JobStatus +from letta.schemas.enums import AgentStepStatus, JobStatus, ProviderType from letta.schemas.llm_batch_job import LLMBatchItem as PydanticLLMBatchItem from letta.schemas.llm_batch_job import LLMBatchJob as PydanticLLMBatchJob from letta.schemas.llm_config import LLMConfig @@ -28,7 +30,7 @@ class LLMBatchManager: @enforce_types def create_batch_request( self, - llm_provider: str, + llm_provider: ProviderType, create_batch_response: BetaMessageBatch, actor: PydanticUser, status: JobStatus = JobStatus.created, @@ -45,7 +47,7 @@ class LLMBatchManager: return batch.to_pydantic() @enforce_types - def get_batch_request_by_id(self, batch_id: str, actor: PydanticUser) -> PydanticLLMBatchJob: + def get_batch_job_by_id(self, batch_id: str, actor: Optional[PydanticUser] = None) -> PydanticLLMBatchJob: """Retrieve a single batch job by ID.""" with self.session_maker() as session: batch = LLMBatchJob.read(db_session=session, identifier=batch_id, actor=actor) @@ -56,7 +58,7 @@ class LLMBatchManager: self, batch_id: str, status: JobStatus, - actor: PydanticUser, + actor: Optional[PydanticUser] = None, latest_polling_response: Optional[BetaMessageBatch] = None, ) -> PydanticLLMBatchJob: """Update a batch job’s status and optionally its polling response.""" @@ -65,7 +67,34 @@ class LLMBatchManager: batch.status = status batch.latest_polling_response = latest_polling_response batch.last_polled_at = datetime.datetime.now(datetime.timezone.utc) - return batch.update(db_session=session, actor=actor).to_pydantic() + batch = batch.update(db_session=session, actor=actor) + return batch.to_pydantic() + + def bulk_update_batch_statuses( + self, + updates: List[BatchPollingResult], + ) -> None: + """ + Efficiently update many LLMBatchJob rows. This is used by the cron jobs. + + `updates` = [(batch_id, new_status, polling_response_or_None), …] + """ + now = datetime.datetime.now(datetime.timezone.utc) + + with self.session_maker() as session: + mappings = [] + for batch_id, status, response in updates: + mappings.append( + { + "id": batch_id, + "status": status, + "latest_polling_response": response, + "last_polled_at": now, + } + ) + + session.bulk_update_mappings(LLMBatchJob, mappings) + session.commit() @enforce_types def delete_batch_request(self, batch_id: str, actor: PydanticUser) -> None: @@ -74,6 +103,18 @@ class LLMBatchManager: batch = LLMBatchJob.read(db_session=session, identifier=batch_id, actor=actor) batch.hard_delete(db_session=session, actor=actor) + @enforce_types + def list_running_batches(self, actor: Optional[PydanticUser] = None) -> List[PydanticLLMBatchJob]: + """Return all running LLM batch jobs, optionally filtered by actor's organization.""" + with self.session_maker() as session: + query = session.query(LLMBatchJob).filter(LLMBatchJob.status == JobStatus.running) + + if actor is not None: + query = query.filter(LLMBatchJob.organization_id == actor.organization_id) + + results = query.all() + return [batch.to_pydantic() for batch in results] + @enforce_types def create_batch_item( self, @@ -131,6 +172,56 @@ class LLMBatchManager: return item.update(db_session=session, actor=actor).to_pydantic() + def bulk_update_batch_items_by_agent( + self, + updates: List[ItemUpdateInfo], + ) -> None: + """ + Efficiently update LLMBatchItem rows by (batch_id, agent_id). + + Args: + updates: List of tuples: + (batch_id, agent_id, new_request_status, batch_request_result) + """ + with self.session_maker() as session: + # For bulk_update_mappings, we need the primary key of each row + # So we must map (batch_id, agent_id) → actual PK (id) + # We'll do it in one DB query using the (batch_id, agent_id) sets + + # 1. Gather the pairs + key_pairs = [(b_id, a_id) for (b_id, a_id, *_rest) in updates] + + # 2. Query items in a single step + items = ( + session.query(LLMBatchItem.id, LLMBatchItem.batch_id, LLMBatchItem.agent_id) + .filter(tuple_(LLMBatchItem.batch_id, LLMBatchItem.agent_id).in_(key_pairs)) + .all() + ) + + # Build a map from (batch_id, agent_id) → PK id + pair_to_pk = {} + for row_id, row_batch_id, row_agent_id in items: + pair_to_pk[(row_batch_id, row_agent_id)] = row_id + + # 3. Construct mappings for the PK-based bulk update + mappings = [] + for batch_id, agent_id, new_status, new_result in updates: + pk_id = pair_to_pk.get((batch_id, agent_id)) + if not pk_id: + # Nonexistent or mismatch → skip + continue + mappings.append( + { + "id": pk_id, + "request_status": new_status, + "batch_request_result": new_result, + } + ) + + if mappings: + session.bulk_update_mappings(LLMBatchItem, mappings) + session.commit() + @enforce_types def delete_batch_item(self, item_id: str, actor: PydanticUser) -> None: """Hard delete a batch item by ID.""" diff --git a/letta/settings.py b/letta/settings.py index 7ddec06b..e5186501 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -203,6 +203,9 @@ class Settings(BaseSettings): httpx_max_keepalive_connections: int = 500 httpx_keepalive_expiry: float = 120.0 + # cron job parameters + poll_running_llm_batches_interval_seconds: int = 5 * 60 + @property def letta_pg_uri(self) -> str: if self.pg_uri: diff --git a/poetry.lock b/poetry.lock index 5132ea81..b0d1277a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -215,6 +215,33 @@ files = [ {file = "appnope-0.1.4.tar.gz", hash = "sha256:1de3860566df9caf38f01f86f65e0e13e379af54f9e4bee1e66b48f2efffd1ee"}, ] +[[package]] +name = "apscheduler" +version = "3.11.0" +description = "In-process task scheduler with Cron-like capabilities" +optional = false +python-versions = ">=3.8" +files = [ + {file = "APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da"}, + {file = "apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133"}, +] + +[package.dependencies] +tzlocal = ">=3.0" + +[package.extras] +doc = ["packaging", "sphinx", "sphinx-rtd-theme (>=1.3.0)"] +etcd = ["etcd3", "protobuf (<=3.21.0)"] +gevent = ["gevent"] +mongodb = ["pymongo (>=3.0)"] +redis = ["redis (>=3.0)"] +rethinkdb = ["rethinkdb (>=2.4.0)"] +sqlalchemy = ["sqlalchemy (>=1.4)"] +test = ["APScheduler[etcd,mongodb,redis,rethinkdb,sqlalchemy,tornado,zookeeper]", "PySide6", "anyio (>=4.5.2)", "gevent", "pytest", "pytz", "twisted"] +tornado = ["tornado (>=4.3)"] +twisted = ["twisted"] +zookeeper = ["kazoo"] + [[package]] name = "argcomplete" version = "3.6.2" @@ -521,10 +548,6 @@ files = [ {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:a37b8f0391212d29b3a91a799c8e4a2855e0576911cdfb2515487e30e322253d"}, {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:e84799f09591700a4154154cab9787452925578841a94321d5ee8fb9a9a328f0"}, {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:f66b5337fa213f1da0d9000bc8dc0cb5b896b726eefd9c6046f699b169c41b9e"}, - {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:5dab0844f2cf82be357a0eb11a9087f70c5430b2c241493fc122bb6f2bb0917c"}, - {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:e4fe605b917c70283db7dfe5ada75e04561479075761a0b3866c081d035b01c1"}, - {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:1e9a65b5736232e7a7f91ff3d02277f11d339bf34099a56cdab6a8b3410a02b2"}, - {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:58d4b711689366d4a03ac7957ab8c28890415e267f9b6589969e74b6e42225ec"}, {file = "Brotli-1.1.0-cp310-cp310-win32.whl", hash = "sha256:be36e3d172dc816333f33520154d708a2657ea63762ec16b62ece02ab5e4daf2"}, {file = "Brotli-1.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:0c6244521dda65ea562d5a69b9a26120769b7a9fb3db2fe9545935ed6735b128"}, {file = "Brotli-1.1.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:a3daabb76a78f829cafc365531c972016e4aa8d5b4bf60660ad8ecee19df7ccc"}, @@ -537,14 +560,8 @@ files = [ {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:19c116e796420b0cee3da1ccec3b764ed2952ccfcc298b55a10e5610ad7885f9"}, {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:510b5b1bfbe20e1a7b3baf5fed9e9451873559a976c1a78eebaa3b86c57b4265"}, {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:a1fd8a29719ccce974d523580987b7f8229aeace506952fa9ce1d53a033873c8"}, - {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c247dd99d39e0338a604f8c2b3bc7061d5c2e9e2ac7ba9cc1be5a69cb6cd832f"}, - {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1b2c248cd517c222d89e74669a4adfa5577e06ab68771a529060cf5a156e9757"}, - {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:2a24c50840d89ded6c9a8fdc7b6ed3692ed4e86f1c4a4a938e1e92def92933e0"}, - {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f31859074d57b4639318523d6ffdca586ace54271a73ad23ad021acd807eb14b"}, {file = "Brotli-1.1.0-cp311-cp311-win32.whl", hash = "sha256:39da8adedf6942d76dc3e46653e52df937a3c4d6d18fdc94a7c29d263b1f5b50"}, {file = "Brotli-1.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:aac0411d20e345dc0920bdec5548e438e999ff68d77564d5e9463a7ca9d3e7b1"}, - {file = "Brotli-1.1.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:32d95b80260d79926f5fab3c41701dbb818fde1c9da590e77e571eefd14abe28"}, - {file = "Brotli-1.1.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b760c65308ff1e462f65d69c12e4ae085cff3b332d894637f6273a12a482d09f"}, {file = "Brotli-1.1.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:316cc9b17edf613ac76b1f1f305d2a748f1b976b033b049a6ecdfd5612c70409"}, {file = "Brotli-1.1.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:caf9ee9a5775f3111642d33b86237b05808dafcd6268faa492250e9b78046eb2"}, {file = "Brotli-1.1.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:70051525001750221daa10907c77830bc889cb6d865cc0b813d9db7fefc21451"}, @@ -555,24 +572,8 @@ files = [ {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:4093c631e96fdd49e0377a9c167bfd75b6d0bad2ace734c6eb20b348bc3ea180"}, {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:7e4c4629ddad63006efa0ef968c8e4751c5868ff0b1c5c40f76524e894c50248"}, {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:861bf317735688269936f755fa136a99d1ed526883859f86e41a5d43c61d8966"}, - {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:87a3044c3a35055527ac75e419dfa9f4f3667a1e887ee80360589eb8c90aabb9"}, - {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:c5529b34c1c9d937168297f2c1fde7ebe9ebdd5e121297ff9c043bdb2ae3d6fb"}, - {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:ca63e1890ede90b2e4454f9a65135a4d387a4585ff8282bb72964fab893f2111"}, - {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:e79e6520141d792237c70bcd7a3b122d00f2613769ae0cb61c52e89fd3443839"}, {file = "Brotli-1.1.0-cp312-cp312-win32.whl", hash = "sha256:5f4d5ea15c9382135076d2fb28dde923352fe02951e66935a9efaac8f10e81b0"}, {file = "Brotli-1.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:906bc3a79de8c4ae5b86d3d75a8b77e44404b0f4261714306e3ad248d8ab0951"}, - {file = "Brotli-1.1.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:8bf32b98b75c13ec7cf774164172683d6e7891088f6316e54425fde1efc276d5"}, - {file = "Brotli-1.1.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:7bc37c4d6b87fb1017ea28c9508b36bbcb0c3d18b4260fcdf08b200c74a6aee8"}, - {file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3c0ef38c7a7014ffac184db9e04debe495d317cc9c6fb10071f7fefd93100a4f"}, - {file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:91d7cc2a76b5567591d12c01f019dd7afce6ba8cba6571187e21e2fc418ae648"}, - {file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a93dde851926f4f2678e704fadeb39e16c35d8baebd5252c9fd94ce8ce68c4a0"}, - {file = "Brotli-1.1.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0db75f47be8b8abc8d9e31bc7aad0547ca26f24a54e6fd10231d623f183d089"}, - {file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:6967ced6730aed543b8673008b5a391c3b1076d834ca438bbd70635c73775368"}, - {file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:7eedaa5d036d9336c95915035fb57422054014ebdeb6f3b42eac809928e40d0c"}, - {file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:d487f5432bf35b60ed625d7e1b448e2dc855422e87469e3f450aa5552b0eb284"}, - {file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:832436e59afb93e1836081a20f324cb185836c617659b07b129141a8426973c7"}, - {file = "Brotli-1.1.0-cp313-cp313-win32.whl", hash = "sha256:43395e90523f9c23a3d5bdf004733246fba087f2948f87ab28015f12359ca6a0"}, - {file = "Brotli-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:9011560a466d2eb3f5a6e4929cf4a09be405c64154e12df0dd72713f6500e32b"}, {file = "Brotli-1.1.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:a090ca607cbb6a34b0391776f0cb48062081f5f60ddcce5d11838e67a01928d1"}, {file = "Brotli-1.1.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2de9d02f5bda03d27ede52e8cfe7b865b066fa49258cbab568720aa5be80a47d"}, {file = "Brotli-1.1.0-cp36-cp36m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2333e30a5e00fe0fe55903c8832e08ee9c3b1382aacf4db26664a16528d51b4b"}, @@ -582,10 +583,6 @@ files = [ {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:fd5f17ff8f14003595ab414e45fce13d073e0762394f957182e69035c9f3d7c2"}, {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_ppc64le.whl", hash = "sha256:069a121ac97412d1fe506da790b3e69f52254b9df4eb665cd42460c837193354"}, {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:e93dfc1a1165e385cc8239fab7c036fb2cd8093728cbd85097b284d7b99249a2"}, - {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_aarch64.whl", hash = "sha256:aea440a510e14e818e67bfc4027880e2fb500c2ccb20ab21c7a7c8b5b4703d75"}, - {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_i686.whl", hash = "sha256:6974f52a02321b36847cd19d1b8e381bf39939c21efd6ee2fc13a28b0d99348c"}, - {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_ppc64le.whl", hash = "sha256:a7e53012d2853a07a4a79c00643832161a910674a893d296c9f1259859a289d2"}, - {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_x86_64.whl", hash = "sha256:d7702622a8b40c49bffb46e1e3ba2e81268d5c04a34f460978c6b5517a34dd52"}, {file = "Brotli-1.1.0-cp36-cp36m-win32.whl", hash = "sha256:a599669fd7c47233438a56936988a2478685e74854088ef5293802123b5b2460"}, {file = "Brotli-1.1.0-cp36-cp36m-win_amd64.whl", hash = "sha256:d143fd47fad1db3d7c27a1b1d66162e855b5d50a89666af46e1679c496e8e579"}, {file = "Brotli-1.1.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:11d00ed0a83fa22d29bc6b64ef636c4552ebafcef57154b4ddd132f5638fbd1c"}, @@ -597,10 +594,6 @@ files = [ {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:919e32f147ae93a09fe064d77d5ebf4e35502a8df75c29fb05788528e330fe74"}, {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_ppc64le.whl", hash = "sha256:23032ae55523cc7bccb4f6a0bf368cd25ad9bcdcc1990b64a647e7bbcce9cb5b"}, {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:224e57f6eac61cc449f498cc5f0e1725ba2071a3d4f48d5d9dffba42db196438"}, - {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_aarch64.whl", hash = "sha256:cb1dac1770878ade83f2ccdf7d25e494f05c9165f5246b46a621cc849341dc01"}, - {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_i686.whl", hash = "sha256:3ee8a80d67a4334482d9712b8e83ca6b1d9bc7e351931252ebef5d8f7335a547"}, - {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_ppc64le.whl", hash = "sha256:5e55da2c8724191e5b557f8e18943b1b4839b8efc3ef60d65985bcf6f587dd38"}, - {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:d342778ef319e1026af243ed0a07c97acf3bad33b9f29e7ae6a1f68fd083e90c"}, {file = "Brotli-1.1.0-cp37-cp37m-win32.whl", hash = "sha256:587ca6d3cef6e4e868102672d3bd9dc9698c309ba56d41c2b9c85bbb903cdb95"}, {file = "Brotli-1.1.0-cp37-cp37m-win_amd64.whl", hash = "sha256:2954c1c23f81c2eaf0b0717d9380bd348578a94161a65b3a2afc62c86467dd68"}, {file = "Brotli-1.1.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:efa8b278894b14d6da122a72fefcebc28445f2d3f880ac59d46c90f4c13be9a3"}, @@ -613,10 +606,6 @@ files = [ {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:1ab4fbee0b2d9098c74f3057b2bc055a8bd92ccf02f65944a241b4349229185a"}, {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:141bd4d93984070e097521ed07e2575b46f817d08f9fa42b16b9b5f27b5ac088"}, {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:fce1473f3ccc4187f75b4690cfc922628aed4d3dd013d047f95a9b3919a86596"}, - {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:d2b35ca2c7f81d173d2fadc2f4f31e88cc5f7a39ae5b6db5513cf3383b0e0ec7"}, - {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:af6fa6817889314555aede9a919612b23739395ce767fe7fcbea9a80bf140fe5"}, - {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:2feb1d960f760a575dbc5ab3b1c00504b24caaf6986e2dc2b01c09c87866a943"}, - {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:4410f84b33374409552ac9b6903507cdb31cd30d2501fc5ca13d18f73548444a"}, {file = "Brotli-1.1.0-cp38-cp38-win32.whl", hash = "sha256:db85ecf4e609a48f4b29055f1e144231b90edc90af7481aa731ba2d059226b1b"}, {file = "Brotli-1.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:3d7954194c36e304e1523f55d7042c59dc53ec20dd4e9ea9d151f1b62b4415c0"}, {file = "Brotli-1.1.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5fb2ce4b8045c78ebbc7b8f3c15062e435d47e7393cc57c25115cfd49883747a"}, @@ -629,10 +618,6 @@ files = [ {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:949f3b7c29912693cee0afcf09acd6ebc04c57af949d9bf77d6101ebb61e388c"}, {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:89f4988c7203739d48c6f806f1e87a1d96e0806d44f0fba61dba81392c9e474d"}, {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:de6551e370ef19f8de1807d0a9aa2cdfdce2e85ce88b122fe9f6b2b076837e59"}, - {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:0737ddb3068957cf1b054899b0883830bb1fec522ec76b1098f9b6e0f02d9419"}, - {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:4f3607b129417e111e30637af1b56f24f7a49e64763253bbc275c75fa887d4b2"}, - {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:6c6e0c425f22c1c719c42670d561ad682f7bfeeef918edea971a79ac5252437f"}, - {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:494994f807ba0b92092a163a0a283961369a65f6cbe01e8891132b7a320e61eb"}, {file = "Brotli-1.1.0-cp39-cp39-win32.whl", hash = "sha256:f0d8a7a6b5983c2496e364b969f0e526647a06b075d034f3297dc66f3b360c64"}, {file = "Brotli-1.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:cdad5b9014d83ca68c25d2e9444e28e967ef16e80f6b436918c700c117a85467"}, {file = "Brotli-1.1.0.tar.gz", hash = "sha256:81de08ac11bcb85841e440c13611c00b67d3bf82698314928d0b676362546724"}, @@ -1056,9 +1041,9 @@ isort = ">=4.3.21,<6.0" jinja2 = ">=2.10.1,<4.0" packaging = "*" pydantic = [ + {version = ">=1.10.0,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.4.0 || >2.4.0,<3.0", extras = ["email"], markers = "python_version >= \"3.12\" and python_version < \"4.0\""}, {version = ">=1.10.0,<2.4.0 || >2.4.0,<3.0", extras = ["email"], markers = "python_version >= \"3.11\" and python_version < \"3.12\""}, {version = ">=1.9.0,<2.4.0 || >2.4.0,<3.0", extras = ["email"], markers = "python_version >= \"3.10\" and python_version < \"3.11\""}, - {version = ">=1.10.0,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.4.0 || >2.4.0,<3.0", extras = ["email"], markers = "python_version >= \"3.12\" and python_version < \"4.0\""}, ] pyyaml = ">=6.0.1" toml = {version = ">=0.10.0,<1.0.0", markers = "python_version < \"3.11\""} @@ -3082,8 +3067,8 @@ psutil = ">=5.9.1" pywin32 = {version = "*", markers = "sys_platform == \"win32\""} pyzmq = ">=25.0.0" requests = [ - {version = ">=2.26.0", markers = "python_version <= \"3.11\""}, {version = ">=2.32.2", markers = "python_version > \"3.11\""}, + {version = ">=2.26.0", markers = "python_version <= \"3.11\""}, ] setuptools = ">=70.0.0" tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} @@ -3936,9 +3921,9 @@ files = [ [package.dependencies] numpy = [ + {version = ">=1.26.0", markers = "python_version >= \"3.12\""}, {version = ">=1.23.2", markers = "python_version == \"3.11\""}, {version = ">=1.22.4", markers = "python_version < \"3.11\""}, - {version = ">=1.26.0", markers = "python_version >= \"3.12\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -4424,7 +4409,6 @@ files = [ {file = "psycopg2-2.9.10-cp311-cp311-win_amd64.whl", hash = "sha256:0435034157049f6846e95103bd8f5a668788dd913a7c30162ca9503fdf542cb4"}, {file = "psycopg2-2.9.10-cp312-cp312-win32.whl", hash = "sha256:65a63d7ab0e067e2cdb3cf266de39663203d38d6a8ed97f5ca0cb315c73fe067"}, {file = "psycopg2-2.9.10-cp312-cp312-win_amd64.whl", hash = "sha256:4a579d6243da40a7b3182e0430493dbd55950c493d8c68f4eec0b302f6bbf20e"}, - {file = "psycopg2-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:91fd603a2155da8d0cfcdbf8ab24a2d54bca72795b90d2a3ed2b6da8d979dee2"}, {file = "psycopg2-2.9.10-cp39-cp39-win32.whl", hash = "sha256:9d5b3b94b79a844a986d029eee38998232451119ad653aea42bb9220a8c5066b"}, {file = "psycopg2-2.9.10-cp39-cp39-win_amd64.whl", hash = "sha256:88138c8dedcbfa96408023ea2b0c369eda40fe5d75002c0964c78f46f11fa442"}, {file = "psycopg2-2.9.10.tar.gz", hash = "sha256:12ec0b40b0273f95296233e8750441339298e6a572f7039da5b260e3c8b60e11"}, @@ -4484,7 +4468,6 @@ files = [ {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:bb89f0a835bcfc1d42ccd5f41f04870c1b936d8507c6df12b7737febc40f0909"}, {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f0c2d907a1e102526dd2986df638343388b94c33860ff3bbe1384130828714b1"}, {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f8157bed2f51db683f31306aa497311b560f2265998122abe1dce6428bd86567"}, - {file = "psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:27422aa5f11fbcd9b18da48373eb67081243662f9b46e6fd07c3eb46e4535142"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:eb09aa7f9cecb45027683bb55aebaaf45a0df8bf6de68801a6afdc7947bb09d4"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b73d6d7f0ccdad7bc43e6d34273f70d587ef62f824d7261c4ae9b8b1b6af90e8"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ce5ab4bf46a211a8e924d307c1b1fcda82368586a19d0a24f8ae166f5c784864"}, @@ -5237,8 +5220,8 @@ grpcio = ">=1.41.0" grpcio-tools = ">=1.41.0" httpx = {version = ">=0.20.0", extras = ["http2"]} numpy = [ - {version = ">=1.21", markers = "python_version >= \"3.10\" and python_version < \"3.12\""}, {version = ">=1.26", markers = "python_version == \"3.12\""}, + {version = ">=1.21", markers = "python_version >= \"3.10\" and python_version < \"3.12\""}, ] portalocker = ">=2.7.0,<3.0.0" pydantic = ">=1.10.8" @@ -6219,6 +6202,23 @@ files = [ {file = "tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"}, ] +[[package]] +name = "tzlocal" +version = "5.3.1" +description = "tzinfo object for the local timezone" +optional = false +python-versions = ">=3.9" +files = [ + {file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"}, + {file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"}, +] + +[package.dependencies] +tzdata = {version = "*", markers = "platform_system == \"Windows\""} + +[package.extras] +devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"] + [[package]] name = "urllib3" version = "2.3.0" @@ -6819,4 +6819,4 @@ tests = ["wikipedia"] [metadata] lock-version = "2.0" python-versions = "<3.14,>=3.10" -content-hash = "b104f1fe1ae3c5b2e1261105f1e4170d28fee235551c230b3603bb5494054bf1" +content-hash = "c7532fe22e86ca8602c0b27be85020e0b139ec521cd5b0dc94b180113ede41c4" diff --git a/pyproject.toml b/pyproject.toml index a5eccac8..044c499b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,7 @@ boto3 = {version = "^1.36.24", optional = true} datamodel-code-generator = {extras = ["http"], version = "^0.25.0"} mcp = "^1.3.0" firecrawl-py = "^1.15.0" +apscheduler = "^3.11.0" [tool.poetry.extras] diff --git a/tests/integration_test_batch_api.py b/tests/integration_test_batch_api.py new file mode 100644 index 00000000..576a9a6c --- /dev/null +++ b/tests/integration_test_batch_api.py @@ -0,0 +1,331 @@ +import os +import threading +import time +from datetime import datetime, timezone +from unittest.mock import AsyncMock + +import pytest +from anthropic.types import BetaErrorResponse, BetaRateLimitError +from anthropic.types.beta import BetaMessage +from anthropic.types.beta.messages import ( + BetaMessageBatch, + BetaMessageBatchErroredResult, + BetaMessageBatchIndividualResponse, + BetaMessageBatchRequestCounts, + BetaMessageBatchSucceededResult, +) +from dotenv import load_dotenv +from letta_client import Letta + +from letta.config import LettaConfig +from letta.helpers import ToolRulesSolver +from letta.jobs.llm_batch_job_polling import poll_running_llm_batches +from letta.orm import Base +from letta.schemas.agent import AgentStepState +from letta.schemas.enums import JobStatus, ProviderType +from letta.schemas.llm_config import LLMConfig +from letta.schemas.tool_rule import InitToolRule +from letta.server.db import db_context +from letta.server.server import SyncServer + +# --- Server and Database Management --- # + + +@pytest.fixture(autouse=True) +def _clear_tables(): + with db_context() as session: + for table in reversed(Base.metadata.sorted_tables): # Reverse to avoid FK issues + if table.name in {"llm_batch_job", "llm_batch_items"}: + session.execute(table.delete()) # Truncate table + session.commit() + + +def _run_server(): + """Starts the Letta server in a background thread.""" + load_dotenv() + from letta.server.rest_api.app import start_server + + start_server(debug=True) + + +@pytest.fixture(scope="session") +def server_url(): + """Ensures a server is running and returns its base URL.""" + url = os.getenv("LETTA_SERVER_URL", "http://localhost:8283") + + if not os.getenv("LETTA_SERVER_URL"): + thread = threading.Thread(target=_run_server, daemon=True) + thread.start() + time.sleep(5) # Allow server startup time + + return url + + +@pytest.fixture(scope="module") +def server(): + config = LettaConfig.load() + print("CONFIG PATH", config.config_path) + config.save() + return SyncServer() + + +@pytest.fixture(scope="session") +def client(server_url): + """Creates a REST client for testing.""" + return Letta(base_url=server_url) + + +# --- Dummy Response Factories --- # + + +def create_batch_response(batch_id: str, processing_status: str = "in_progress") -> BetaMessageBatch: + """Create a dummy BetaMessageBatch with the specified ID and status.""" + now = datetime(2024, 8, 20, 18, 37, 24, 100435, tzinfo=timezone.utc) + return BetaMessageBatch( + id=batch_id, + archived_at=now, + cancel_initiated_at=now, + created_at=now, + ended_at=now, + expires_at=now, + processing_status=processing_status, + request_counts=BetaMessageBatchRequestCounts( + canceled=10, + errored=30, + expired=10, + processing=100, + succeeded=50, + ), + results_url=None, + type="message_batch", + ) + + +def create_successful_response(custom_id: str) -> BetaMessageBatchIndividualResponse: + """Create a dummy successful batch response.""" + return BetaMessageBatchIndividualResponse( + custom_id=custom_id, + result=BetaMessageBatchSucceededResult( + type="succeeded", + message=BetaMessage( + id="msg_abc123", + role="assistant", + type="message", + model="claude-3-5-sonnet-20240620", + content=[{"type": "text", "text": "hi!"}], + usage={"input_tokens": 5, "output_tokens": 7}, + stop_reason="end_turn", + ), + ), + ) + + +def create_failed_response(custom_id: str) -> BetaMessageBatchIndividualResponse: + """Create a dummy failed batch response with a rate limit error.""" + return BetaMessageBatchIndividualResponse( + custom_id=custom_id, + result=BetaMessageBatchErroredResult( + type="errored", + error=BetaErrorResponse(type="error", error=BetaRateLimitError(type="rate_limit_error", message="Rate limit hit.")), + ), + ) + + +# --- Test Setup Helpers --- # + + +def create_test_agent(client, name, model="anthropic/claude-3-5-sonnet-20241022"): + """Create a test agent with standardized configuration.""" + return client.agents.create( + name=name, + include_base_tools=True, + model=model, + tags=["test_agents"], + embedding="letta/letta-free", + ) + + +def create_test_batch_job(server, batch_response, default_user): + """Create a test batch job with the given batch response.""" + return server.batch_manager.create_batch_request( + llm_provider=ProviderType.anthropic, + create_batch_response=batch_response, + actor=default_user, + status=JobStatus.running, + ) + + +def create_test_batch_item(server, batch_id, agent_id, default_user): + """Create a test batch item for the given batch and agent.""" + dummy_llm_config = LLMConfig( + model="claude-3-7-sonnet-latest", + model_endpoint_type="anthropic", + model_endpoint="https://api.anthropic.com/v1", + context_window=32000, + handle=f"anthropic/claude-3-7-sonnet-latest", + put_inner_thoughts_in_kwargs=True, + max_tokens=4096, + ) + + common_step_state = AgentStepState( + step_number=1, tool_rules_solver=ToolRulesSolver(tool_rules=[InitToolRule(tool_name="send_message")]) + ) + + return server.batch_manager.create_batch_item( + batch_id=batch_id, + agent_id=agent_id, + llm_config=dummy_llm_config, + step_state=common_step_state, + actor=default_user, + ) + + +def mock_anthropic_client(server, batch_a_resp, batch_b_resp, agent_b_id, agent_c_id): + """Set up mocks for the Anthropic client's retrieve and results methods.""" + + # Mock the retrieve method + async def dummy_retrieve(batch_resp_id: str) -> BetaMessageBatch: + if batch_resp_id == batch_a_resp.id: + return batch_a_resp + elif batch_resp_id == batch_b_resp.id: + return batch_b_resp + else: + raise ValueError(f"Unknown batch response id: {batch_resp_id}") + + server.anthropic_async_client.beta.messages.batches.retrieve = AsyncMock(side_effect=dummy_retrieve) + + # Mock the results method + def dummy_results(batch_resp_id: str): + if batch_resp_id == batch_b_resp.id: + + async def generator(): + yield create_successful_response(agent_b_id) + yield create_failed_response(agent_c_id) + + return generator() + else: + raise RuntimeError("This test should never request the results for batch_a.") + + server.anthropic_async_client.beta.messages.batches.results = dummy_results + + +# ----------------------------- +# End-to-End Test +# ----------------------------- +@pytest.mark.asyncio +async def test_polling_mixed_batch_jobs(client, default_user, server): + """ + End-to-end test for polling batch jobs with mixed statuses and idempotency. + + Test scenario: + - Create two batch jobs: + - Job A: Single agent that remains "in_progress" + - Job B: Two agents that complete (one succeeds, one fails) + - Poll jobs and verify: + - Job A remains in "running" state + - Job B moves to "completed" state + - Job B's items reflect appropriate individual success/failure statuses + - Test idempotency: + - Run additional polls and verify: + - Completed job B remains unchanged (no status changes or re-polling) + - In-progress job A continues to be polled + - All batch items maintain their final states + """ + # --- Step 1: Prepare test data --- + # Create batch responses with different statuses + batch_a_resp = create_batch_response("msgbatch_A", processing_status="in_progress") + batch_b_resp = create_batch_response("msgbatch_B", processing_status="ended") + + # Create test agents + agent_a = create_test_agent(client, "agent_a") + agent_b = create_test_agent(client, "agent_b") + agent_c = create_test_agent(client, "agent_c") + + # --- Step 2: Create batch jobs --- + job_a = create_test_batch_job(server, batch_a_resp, default_user) + job_b = create_test_batch_job(server, batch_b_resp, default_user) + + # --- Step 3: Create batch items --- + item_a = create_test_batch_item(server, job_a.id, agent_a.id, default_user) + item_b = create_test_batch_item(server, job_b.id, agent_b.id, default_user) + item_c = create_test_batch_item(server, job_b.id, agent_c.id, default_user) + + # --- Step 4: Mock the Anthropic client --- + mock_anthropic_client(server, batch_a_resp, batch_b_resp, agent_b.id, agent_c.id) + + # --- Step 5: Run the polling job twice (simulating periodic polling) --- + await poll_running_llm_batches(server) + await poll_running_llm_batches(server) + + # --- Step 6: Verify batch job status updates --- + updated_job_a = server.batch_manager.get_batch_job_by_id(batch_id=job_a.id, actor=default_user) + updated_job_b = server.batch_manager.get_batch_job_by_id(batch_id=job_b.id, actor=default_user) + + # Job A should remain running since its processing_status is "in_progress" + assert updated_job_a.status == JobStatus.running + # Job B should be updated to completed + assert updated_job_b.status == JobStatus.completed + + # Both jobs should have been polled + assert updated_job_a.last_polled_at is not None + assert updated_job_b.last_polled_at is not None + assert updated_job_b.latest_polling_response is not None + + # --- Step 7: Verify batch item status updates --- + # Item A should remain unchanged + updated_item_a = server.batch_manager.get_batch_item_by_id(item_a.id, actor=default_user) + assert updated_item_a.request_status == JobStatus.created + assert updated_item_a.batch_request_result is None + + # Item B should be marked as completed with a successful result + updated_item_b = server.batch_manager.get_batch_item_by_id(item_b.id, actor=default_user) + assert updated_item_b.request_status == JobStatus.completed + assert updated_item_b.batch_request_result is not None + + # Item C should be marked as failed with an error result + updated_item_c = server.batch_manager.get_batch_item_by_id(item_c.id, actor=default_user) + assert updated_item_c.request_status == JobStatus.failed + assert updated_item_c.batch_request_result is not None + + # --- Step 8: Test idempotency by running polls again --- + # Save timestamps and response objects to compare later + job_a_polled_at = updated_job_a.last_polled_at + job_b_polled_at = updated_job_b.last_polled_at + job_b_response = updated_job_b.latest_polling_response + + # Save detailed item states + item_a_status = updated_item_a.request_status + item_b_status = updated_item_b.request_status + item_c_status = updated_item_c.request_status + item_b_result = updated_item_b.batch_request_result + item_c_result = updated_item_c.batch_request_result + + # Run the polling job again multiple times + await poll_running_llm_batches(server) + await poll_running_llm_batches(server) + await poll_running_llm_batches(server) + + # --- Step 9: Verify that nothing changed for completed jobs --- + # Refresh all objects + final_job_a = server.batch_manager.get_batch_job_by_id(batch_id=job_a.id, actor=default_user) + final_job_b = server.batch_manager.get_batch_job_by_id(batch_id=job_b.id, actor=default_user) + final_item_a = server.batch_manager.get_batch_item_by_id(item_a.id, actor=default_user) + final_item_b = server.batch_manager.get_batch_item_by_id(item_b.id, actor=default_user) + final_item_c = server.batch_manager.get_batch_item_by_id(item_c.id, actor=default_user) + + # Job A should still be polling (last_polled_at should update) + assert final_job_a.status == JobStatus.running + assert final_job_a.last_polled_at > job_a_polled_at + + # Job B should remain completed with no status changes + assert final_job_b.status == JobStatus.completed + # The completed job should not be polled again + assert final_job_b.last_polled_at == job_b_polled_at + assert final_job_b.latest_polling_response == job_b_response + + # All items should maintain their final states + assert final_item_a.request_status == item_a_status + assert final_item_b.request_status == item_b_status + assert final_item_c.request_status == item_c_status + assert final_item_b.batch_request_result == item_b_result + assert final_item_c.batch_request_result == item_c_result diff --git a/tests/test_managers.py b/tests/test_managers.py index 49373f5d..83a2a0f3 100644 --- a/tests/test_managers.py +++ b/tests/test_managers.py @@ -39,7 +39,7 @@ from letta.schemas.agent import AgentStepState, CreateAgent, UpdateAgent from letta.schemas.block import Block as PydanticBlock from letta.schemas.block import BlockUpdate, CreateBlock from letta.schemas.embedding_config import EmbeddingConfig -from letta.schemas.enums import AgentStepStatus, JobStatus, MessageRole +from letta.schemas.enums import AgentStepStatus, JobStatus, MessageRole, ProviderType from letta.schemas.environment_variables import SandboxEnvironmentVariableCreate, SandboxEnvironmentVariableUpdate from letta.schemas.file import FileMetadata as PydanticFileMetadata from letta.schemas.identity import IdentityCreate, IdentityProperty, IdentityPropertyType, IdentityType, IdentityUpdate @@ -4708,20 +4708,20 @@ def test_list_tags(server: SyncServer, default_user, default_organization): def test_create_and_get_batch_request(server, default_user, dummy_beta_message_batch): batch = server.batch_manager.create_batch_request( - llm_provider="anthropic", + llm_provider=ProviderType.anthropic, status=JobStatus.created, create_batch_response=dummy_beta_message_batch, actor=default_user, ) assert batch.id.startswith("batch_req-") assert batch.create_batch_response == dummy_beta_message_batch - fetched = server.batch_manager.get_batch_request_by_id(batch.id, actor=default_user) + fetched = server.batch_manager.get_batch_job_by_id(batch.id, actor=default_user) assert fetched.id == batch.id def test_update_batch_status(server, default_user, dummy_beta_message_batch): batch = server.batch_manager.create_batch_request( - llm_provider="anthropic", + llm_provider=ProviderType.anthropic, status=JobStatus.created, create_batch_response=dummy_beta_message_batch, actor=default_user, @@ -4735,7 +4735,7 @@ def test_update_batch_status(server, default_user, dummy_beta_message_batch): actor=default_user, ) - updated = server.batch_manager.get_batch_request_by_id(batch.id, actor=default_user) + updated = server.batch_manager.get_batch_job_by_id(batch.id, actor=default_user) assert updated.status == JobStatus.completed assert updated.latest_polling_response == dummy_beta_message_batch assert updated.last_polled_at >= before @@ -4743,7 +4743,7 @@ def test_update_batch_status(server, default_user, dummy_beta_message_batch): def test_create_and_get_batch_item(server, default_user, sarah_agent, dummy_beta_message_batch, dummy_llm_config, dummy_step_state): batch = server.batch_manager.create_batch_request( - llm_provider="anthropic", + llm_provider=ProviderType.anthropic, status=JobStatus.created, create_batch_response=dummy_beta_message_batch, actor=default_user, @@ -4769,7 +4769,7 @@ def test_update_batch_item( server, default_user, sarah_agent, dummy_beta_message_batch, dummy_llm_config, dummy_step_state, dummy_successful_response ): batch = server.batch_manager.create_batch_request( - llm_provider="anthropic", + llm_provider=ProviderType.anthropic, status=JobStatus.created, create_batch_response=dummy_beta_message_batch, actor=default_user, @@ -4801,7 +4801,7 @@ def test_update_batch_item( def test_delete_batch_item(server, default_user, sarah_agent, dummy_beta_message_batch, dummy_llm_config, dummy_step_state): batch = server.batch_manager.create_batch_request( - llm_provider="anthropic", + llm_provider=ProviderType.anthropic, status=JobStatus.created, create_batch_response=dummy_beta_message_batch, actor=default_user,