diff --git a/letta/jobs/llm_batch_job_polling.py b/letta/jobs/llm_batch_job_polling.py index 7651c283..6e0ba172 100644 --- a/letta/jobs/llm_batch_job_polling.py +++ b/letta/jobs/llm_batch_job_polling.py @@ -181,7 +181,9 @@ async def poll_running_llm_batches(server: "SyncServer") -> List[LettaBatchRespo try: # 1. Retrieve running batch jobs - batches = await server.batch_manager.list_running_llm_batches_async(weeks=max(settings.batch_job_polling_lookback_weeks, 1)) + batches = await server.batch_manager.list_running_llm_batches_async( + weeks=max(settings.batch_job_polling_lookback_weeks, 1), batch_size=settings.batch_job_polling_batch_size + ) metrics.total_batches = len(batches) # TODO: Expand to more providers diff --git a/letta/services/llm_batch_manager.py b/letta/services/llm_batch_manager.py index c9071eba..eeff0673 100644 --- a/letta/services/llm_batch_manager.py +++ b/letta/services/llm_batch_manager.py @@ -206,7 +206,7 @@ class LLMBatchManager: @enforce_types @trace_method async def list_running_llm_batches_async( - self, actor: Optional[PydanticUser] = None, weeks: Optional[int] = None + self, actor: Optional[PydanticUser] = None, weeks: Optional[int] = None, batch_size: Optional[int] = None ) -> List[PydanticLLMBatchJob]: """Return all running LLM batch jobs, optionally filtered by actor's organization and recent weeks.""" async with db_registry.async_session() as session: @@ -219,6 +219,9 @@ class LLMBatchManager: cutoff_datetime = datetime.datetime.utcnow() - datetime.timedelta(weeks=weeks) query = query.where(LLMBatchJob.created_at >= cutoff_datetime) + if batch_size is not None: + query = query.limit(batch_size) + results = await session.execute(query) return [batch.to_pydantic() for batch in results.scalars().all()] diff --git a/letta/settings.py b/letta/settings.py index e337ee70..6cb193f5 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -233,6 +233,7 @@ class Settings(BaseSettings): poll_running_llm_batches_interval_seconds: int = 5 * 60 poll_lock_retry_interval_seconds: int = 5 * 60 batch_job_polling_lookback_weeks: int = 2 + batch_job_polling_batch_size: Optional[int] = None # for OCR mistral_api_key: Optional[str] = None