From 8ddec60d0790e944b2b9a392ebf1da70a5bedfb8 Mon Sep 17 00:00:00 2001 From: cthomas Date: Fri, 16 May 2025 15:18:06 -0700 Subject: [PATCH] feat(asyncify): migrate get active jobs (#2227) --- letta/server/rest_api/routers/v1/jobs.py | 6 ++--- letta/services/job_manager.py | 29 ++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/letta/server/rest_api/routers/v1/jobs.py b/letta/server/rest_api/routers/v1/jobs.py index 8adbdd2d..9c0cba4e 100644 --- a/letta/server/rest_api/routers/v1/jobs.py +++ b/letta/server/rest_api/routers/v1/jobs.py @@ -33,16 +33,16 @@ def list_jobs( @router.get("/active", response_model=List[Job], operation_id="list_active_jobs") -def list_active_jobs( +async def list_active_jobs( server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present ): """ List all active jobs. """ - actor = server.user_manager.get_user_or_default(user_id=actor_id) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) - return server.job_manager.list_jobs(actor=actor, statuses=[JobStatus.created, JobStatus.running]) + return await server.job_manager.list_jobs_async(actor=actor, statuses=[JobStatus.created, JobStatus.running]) @router.get("/{job_id}", response_model=Job, operation_id="retrieve_job") diff --git a/letta/services/job_manager.py b/letta/services/job_manager.py index d279ac90..d3c7ca59 100644 --- a/letta/services/job_manager.py +++ b/letta/services/job_manager.py @@ -150,6 +150,35 @@ class JobManager: ) return [job.to_pydantic() for job in jobs] + @enforce_types + async def list_jobs_async( + self, + actor: PydanticUser, + before: Optional[str] = None, + after: Optional[str] = None, + limit: Optional[int] = 50, + statuses: Optional[List[JobStatus]] = None, + job_type: JobType = JobType.JOB, + ascending: bool = True, + ) -> List[PydanticJob]: + """List all jobs with optional pagination and status filter.""" + async with db_registry.async_session() as session: + filter_kwargs = {"user_id": actor.id, "job_type": job_type} + + # Add status filter if provided + if statuses: + filter_kwargs["status"] = statuses + + jobs = await JobModel.list_async( + db_session=session, + before=before, + after=after, + limit=limit, + ascending=ascending, + **filter_kwargs, + ) + return [job.to_pydantic() for job in jobs] + @enforce_types def delete_job_by_id(self, job_id: str, actor: PydanticUser) -> PydanticJob: """Delete a job by its ID."""