From 0f69d503aa53be6f63ed1eac2aeb786456d140fb Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Wed, 16 Jul 2025 15:43:32 -0700 Subject: [PATCH] fix: Move callback execution out of the async session (#3360) --- letta/services/job_manager.py | 169 ++++++++++++++++++++++------------ 1 file changed, 110 insertions(+), 59 deletions(-) diff --git a/letta/services/job_manager.py b/letta/services/job_manager.py index dbba4a91..afd26b80 100644 --- a/letta/services/job_manager.py +++ b/letta/services/job_manager.py @@ -67,8 +67,20 @@ class JobManager: @trace_method def update_job_by_id(self, job_id: str, job_update: JobUpdate, actor: PydanticUser) -> PydanticJob: """Update a job by its ID with the given JobUpdate object.""" + # First check if we need to dispatch a callback + needs_callback = False + callback_url = None + with db_registry.session() as session: + job = self._verify_job_access(session=session, job_id=job_id, actor=actor, access=["write"]) + not_completed_before = not bool(job.completed_at) + + # Check if we'll need to dispatch callback + if job_update.status in {JobStatus.completed, JobStatus.failed} and not_completed_before and job.callback_url: + needs_callback = True + callback_url = job.callback_url + + # Update the job first to get the final metadata with db_registry.session() as session: - # Fetch the job by ID job = self._verify_job_access(session=session, job_id=job_id, actor=actor, access=["write"]) not_completed_before = not bool(job.completed_at) @@ -84,20 +96,52 @@ class JobManager: if job_update.status in {JobStatus.completed, JobStatus.failed} and not_completed_before: job.completed_at = get_utc_time().replace(tzinfo=None) - if job.callback_url: - self._dispatch_callback(job) - # Save the updated job to the database - job.update(db_session=session, actor=actor) + # Save the updated job to the database first + job = job.update(db_session=session, actor=actor) - return job.to_pydantic() + # Get the updated metadata for callback + final_metadata = job.metadata_ + result = job.to_pydantic() + + # Dispatch callback outside of database session if needed + if needs_callback: + callback_info = { + "job_id": job_id, + "callback_url": callback_url, + "status": job_update.status, + "completed_at": get_utc_time().replace(tzinfo=None), + "metadata": final_metadata, + } + callback_result = self._dispatch_callback_sync(callback_info) + + # Update callback status in a separate transaction + with db_registry.session() as session: + job = self._verify_job_access(session=session, job_id=job_id, actor=actor, access=["write"]) + job.callback_sent_at = callback_result["callback_sent_at"] + job.callback_status_code = callback_result.get("callback_status_code") + job.callback_error = callback_result.get("callback_error") + job.update(db_session=session, actor=actor) + result = job.to_pydantic() + + return result @enforce_types @trace_method async def update_job_by_id_async(self, job_id: str, job_update: JobUpdate, actor: PydanticUser) -> PydanticJob: """Update a job by its ID with the given JobUpdate object asynchronously.""" - callback_func = None + # First check if we need to dispatch a callback + needs_callback = False + callback_url = None + async with db_registry.async_session() as session: + job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"]) + # Check if we'll need to dispatch callback + if job_update.status in {JobStatus.completed, JobStatus.failed} and job.callback_url: + needs_callback = True + callback_url = job.callback_url + + # Update the job first to get the final metadata async with db_registry.async_session() as session: # Fetch the job by ID job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"]) @@ -116,25 +160,35 @@ class JobManager: if job_update.status in {JobStatus.completed, JobStatus.failed}: logger.info(f"Current job completed at: {job.completed_at}") job.completed_at = get_utc_time().replace(tzinfo=None) - if job.callback_url: - callback_func = self._dispatch_callback_async( - callback_url=job.callback_url, - payload={ - "job_id": job.id, - "status": job.status, - "completed_at": job.completed_at.isoformat() if job.completed_at else None, - "metadata": job.metadata_, - }, - actor=actor, - ) - # Save the updated job to the database - await job.update_async(db_session=session, actor=actor) + # Save the updated job to the database first + job = await job.update_async(db_session=session, actor=actor) - if callback_func: - return await callback_func + # Get the updated metadata for callback + final_metadata = job.metadata_ + result = job.to_pydantic() - return job.to_pydantic() + # Dispatch callback outside of database session if needed + if needs_callback: + callback_info = { + "job_id": job_id, + "callback_url": callback_url, + "status": job_update.status, + "completed_at": get_utc_time().replace(tzinfo=None), + "metadata": final_metadata, + } + callback_result = await self._dispatch_callback_async(callback_info) + + # Update callback status in a separate transaction + async with db_registry.async_session() as session: + job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"]) + job.callback_sent_at = callback_result["callback_sent_at"] + job.callback_status_code = callback_result.get("callback_status_code") + job.callback_error = callback_result.get("callback_error") + await job.update_async(db_session=session, actor=actor) + result = job.to_pydantic() + + return result @enforce_types @trace_method @@ -617,7 +671,7 @@ class JobManager: session: Session, job_id: str, actor: PydanticUser, - access: List[Literal["read", "write", "delete"]] = ["read"], + access: List[Literal["read", "write", "admin"]] = ["read"], ) -> JobModel: """ Verify that a job exists and the user has the required access. @@ -685,61 +739,58 @@ class JobManager: return request_config @trace_method - def _dispatch_callback(self, job: JobModel) -> None: + def _dispatch_callback_sync(self, callback_info: dict) -> dict: """ - POST a standard JSON payload to job.callback_url - and record timestamp + HTTP status. + POST a standard JSON payload to callback_url and return callback status. """ - payload = { - "job_id": job.id, - "status": job.status, - "completed_at": job.completed_at.isoformat() if job.completed_at else None, - "metadata": job.metadata_, + "job_id": callback_info["job_id"], + "status": callback_info["status"], + "completed_at": callback_info["completed_at"].isoformat() if callback_info["completed_at"] else None, + "metadata": callback_info["metadata"], } + + callback_sent_at = get_utc_time().replace(tzinfo=None) + result = {"callback_sent_at": callback_sent_at} + try: log_event("POST callback dispatched", payload) - resp = post(job.callback_url, json=payload, timeout=5.0) + resp = post(callback_info["callback_url"], json=payload, timeout=5.0) log_event("POST callback finished") - job.callback_sent_at = get_utc_time().replace(tzinfo=None) - job.callback_status_code = resp.status_code - + result["callback_status_code"] = resp.status_code except Exception as e: - error_message = f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {e!s}" + error_message = f"Failed to dispatch callback for job {callback_info['job_id']} to {callback_info['callback_url']}: {e!s}" logger.error(error_message) - # Record the failed attempt - job.callback_sent_at = get_utc_time().replace(tzinfo=None) - job.callback_error = error_message + result["callback_error"] = error_message # Continue silently - callback failures should not affect job completion + return result + @trace_method - async def _dispatch_callback_async(self, callback_url: str, payload: dict, actor: PydanticUser) -> PydanticJob: + async def _dispatch_callback_async(self, callback_info: dict) -> dict: """ - POST a standard JSON payload to job.callback_url and record timestamp + HTTP status asynchronously. + POST a standard JSON payload to callback_url and return callback status asynchronously. """ - job_id = payload["job_id"] - callback_sent_at, callback_status_code, callback_error = None, None, None + payload = { + "job_id": callback_info["job_id"], + "status": callback_info["status"], + "completed_at": callback_info["completed_at"].isoformat() if callback_info["completed_at"] else None, + "metadata": callback_info["metadata"], + } + + callback_sent_at = get_utc_time().replace(tzinfo=None) + result = {"callback_sent_at": callback_sent_at} try: async with AsyncClient() as client: log_event("POST callback dispatched", payload) - resp = await client.post(callback_url, json=payload, timeout=5.0) + resp = await client.post(callback_info["callback_url"], json=payload, timeout=5.0) log_event("POST callback finished") - # Ensure timestamp is timezone-naive for DB compatibility - callback_sent_at = get_utc_time().replace(tzinfo=None) - callback_status_code = resp.status_code + result["callback_status_code"] = resp.status_code except Exception as e: - error_message = f"Failed to dispatch callback for job {job_id} to {callback_url}: {e!s}" + error_message = f"Failed to dispatch callback for job {callback_info['job_id']} to {callback_info['callback_url']}: {e!s}" logger.error(error_message) - # Record the failed attempt - callback_sent_at = get_utc_time().replace(tzinfo=None) - callback_error = error_message + result["callback_error"] = error_message # Continue silently - callback failures should not affect job completion - async with db_registry.async_session() as session: - job = await JobModel.read_async(db_session=session, identifier=job_id, actor=actor, access_type=AccessType.USER) - job.callback_sent_at = callback_sent_at - job.callback_status_code = callback_status_code - job.callback_error = callback_error - await job.update_async(db_session=session, actor=actor) - return job.to_pydantic() + return result